Browse Source

added a super simple HTTP server

typed
Weird Constructor 6 months ago
parent
commit
d9c5e9af50
  1. 3
      Cargo.toml
  2. 68
      doc/wlambda_reference.md
  3. 43
      src/stdlib/helpers.rs
  4. 294
      src/stdlib/http.rs
  5. 2
      src/stdlib/mod.rs
  6. 43
      src/stdlib/odbc.rs

3
Cargo.toml

@ -17,7 +17,7 @@ default = [
"rmp-serde", "toml", "quick-xml", "socket2", "base64", "flate2"
]
mqtt = ["rumqttd", "rumqttc"]
http = ["reqwest"]
http = ["reqwest", "rouille"]
cursive = ["dep:cursive", "cursive_buffered_backend", "unicode-width"]
odbc = ["odbc-api", "odbc-sys"]
clipboard = ["copypasta"]
@ -39,6 +39,7 @@ mint = { version = "0.5.5", optional = true }
quick-xml = { version = "0.27.1", optional = true }
socket2 = { version = "0.5.1", optional = true, features = ["all"] }
reqwest = { version = "0.11.9", optional = true, features = ["blocking"] }
rouille = { version = "3.6.1", optional = true }
rumqttc = { version = "0.20.0", optional = true }
rumqttd = { version = "0.14.0", optional = true }
cursive = { version = "0.20.0", optional = true, features = ["crossterm-backend"], default-features = false }

68
doc/wlambda_reference.md

@ -9002,7 +9002,7 @@ std:assert_str_eq
### <a name="129-http-client"></a>12.9 - HTTP Client
WLambda offers an optional integrated HTTP client by enabling the `reqwest`
WLambda offers an optional integrated HTTP client by enabling the `reqwest` or `http`
feature at compile time. With this you can create a new client using `std:http:client:new` and make HTTP requests using `std:http:get`, `std:http:post` and `std:http:request`. Also support for basic authentication and token based bearer authentication
is there.
@ -9122,6 +9122,72 @@ std:assert_str_eq body.json $[ $["x", 10], ${ y = 20 } ];
std:assert_eq response.status 200;
```
### <a name="129-http-server"></a>12.9 - HTTP Server
WLambda offers an optional integrated HTTP server by enabling the `http`
feature at compile time. With this you can create a new server using `std:http:server:new`.
#### - std:http:server:new _endpoint-string_ -> $\<HttpServer\> | $error
This creates a new http server handle. If you drop that handle, meaning if you remove the last
reference to it the HTTP server will be stopped.
You can use the following methods to handle requests:
- `$<HttpServer>.try_respond _respond-func_` to check quickly if some request can be answered.
- `$<HttpServer>.wait_respond _respond-func_` to wait indefinitely until a request arrives
- `$<HttpServer>.timeout_respond _duration_ _respond-func_` to wait the specified _duration_ until a request arrives.
All three methods call the _respond-func_ when a new request arrives. The first argument is a request
map with the following keys:
- `:url` contains the URL (no query, unescaped)
- `:method` contains the HTTP method
- `:body` contains the request body bytes
Here is a more elaborate example to get you started. It also starts a client request, for
stopping the code (as it is executed by the test suite):
```wlambda
!srv = std:http:server:new "localhost:28080";
!client_thread = std:thread:spawn $code{
!@import std;
!@wlambda;
std:thread:sleep :ms => 500;
!cl = std:http:client:new[];
!resp = std:http:get cl "http://localhost:28080/quit";
std:displayln resp;
resp.status
};
!running = $true;
while running {
srv.timeout_respond :ms => 1000 {!(req) = @;
std:displayln "Req " req.method req.url;
if req.url == "/" {
return :redirect => "/files/xxx.txt";
};
if req.url == "/quit" {
.running = $false;
return $[:data, "application/json", std:ser:json $["quit"]];
};
if req.url &> $r($^\/files\/) {
$[:file, "/files", "/var/www/"]
} {
$[:data, "application/json", std:ser:json $["ok", 1, 2, 3]]
};
};
std:displayln ":tick:";
};
std:assert_eq client_thread.join[] 200;
```
### <a name="1210-operating-system-utilities"></a>12.10 - Operating System Utilities
Some operating system utility functions. Some might only be available if

43
src/stdlib/helpers.rs

@ -0,0 +1,43 @@
use crate::{threads::AVal, VVal};
use std::sync::{Condvar, Mutex};
#[allow(dead_code)]
pub struct PendingResult {
lock: Mutex<(bool, AVal)>,
cvar: Condvar,
}
impl PendingResult {
pub fn new() -> Self {
Self { lock: Mutex::new((true, AVal::None)), cvar: Condvar::new() }
}
pub fn send(&self, res: &VVal) -> Result<(), String> {
match self.lock.lock() {
Ok(mut pend) => {
pend.0 = false;
pend.1 = AVal::from_vval(&res);
self.cvar.notify_one();
Ok(())
}
Err(e) => Err(format!("PendingResult thread send error: {}", e)),
}
}
pub fn wait(&self) -> Result<VVal, String> {
let lock = match self.lock.lock() {
Ok(lock) => lock,
Err(e) => {
return Err(format!("PendingResult thread lock error: {}", e));
}
};
match self.cvar.wait_while(lock, |pend| pend.0) {
Ok(pend) => Ok(pend.1.to_vval()),
Err(e) => {
return Err(format!("PendingResult thread wait error: {}", e));
}
}
}
}

294
src/stdlib/http.rs

@ -17,6 +17,17 @@ use std::cell::RefCell;
#[cfg(feature = "reqwest")]
use std::rc::Rc;
#[cfg(feature = "rouille")]
use super::PendingResult;
#[cfg(feature = "rouille")]
use rouille::{Response, Server};
#[cfg(feature = "rouille")]
use std::sync::mpsc::{Receiver, Sender};
#[cfg(feature = "rouille")]
use std::sync::{Arc, Mutex};
#[cfg(feature = "rouille")]
use std::thread::JoinHandle;
#[cfg(feature = "reqwest")]
#[derive(Debug, Clone)]
struct VHttpClient {
@ -218,6 +229,275 @@ fn add_dump(out: VVal, dump: VVal) -> VVal {
}
}
#[cfg(feature = "rouille")]
#[derive(Debug, Clone)]
struct PendingHttpRequest {
url: String,
method: String,
body: Vec<u8>,
}
#[cfg(feature = "rouille")]
struct HttpServer {
request_receiver: Receiver<(PendingHttpRequest, Arc<PendingResult>)>,
quit_sender: Sender<()>,
thread_join: Option<JoinHandle<()>>,
}
#[cfg(feature = "rouille")]
impl HttpServer {
pub fn start(listen_ip_port: &str) -> Result<Self, String> {
let (sender, receiver) = std::sync::mpsc::channel();
let sender = Arc::new(Mutex::new(sender));
match Server::new(listen_ip_port, move |request| {
if request.url() == "/favicon.ico" {
Response::from_data(
"image/x-icon",
include_bytes!("../../res/wlambda_logo_60.ico").to_vec(),
)
} else {
use std::io::Read;
let pending = Arc::new(PendingResult::new());
let mut data = match request.data() {
Some(data) => data,
None => {
return Response::text("Failed to get body data").with_status_code(500);
}
};
let mut buf = Vec::new();
match data.read_to_end(&mut buf) {
Ok(_) => (),
Err(_) => return Response::text("Failed to read body").with_status_code(500),
};
let p_request = PendingHttpRequest {
body: buf,
method: request.method().to_string(),
url: request.url().to_string(),
};
match sender.lock() {
Ok(locked_sender) => match locked_sender.send((p_request, pending.clone())) {
Ok(_) => (),
Err(e) => {
return Response::text(format!(
"Failed to send request to WLambda: {}",
e
))
.with_status_code(500)
}
},
Err(e) => {
return Response::text(format!("Internal Mutex Error: {}", e))
.with_status_code(500)
}
}
match pending.wait() {
Ok(res) => {
let resp_type = res.v_s_raw(0);
match &resp_type[..] {
"file" => {
let prefix = res.v_(1).s_raw();
let path = res.v_(2).s_raw();
if let Some(req) = request.remove_prefix(&prefix) {
rouille::match_assets(&req, &path)
} else {
Response::text(format!("Invalid file response: {}", res.s()))
.with_status_code(500)
}
}
"redirect" => Response::redirect_303(res.v_(1).s_raw()),
"data" => res.v_(1).with_s_ref(|conttype| {
res.v_(2)
.with_bv_ref(|bv| Response::from_data(conttype.to_string(), bv))
}),
_ => Response::text(format!("Unknown Response Type: {}", res.s())),
}
}
Err(e) => Response::text(format!("Pending request not answered: {}", e))
.with_status_code(500),
}
}
}) {
Ok(server) => {
let (handle, sender) = server.stoppable();
Ok(Self {
request_receiver: receiver,
thread_join: Some(handle),
quit_sender: sender,
})
}
Err(e) => Err(format!("{}", e)),
}
}
}
#[cfg(feature = "rouille")]
impl Drop for HttpServer {
#[allow(unused_must_use)]
fn drop(&mut self) {
if let Some(handle) = self.thread_join.take() {
self.quit_sender.send(()).expect("Sending http:server end signal works on drop");
handle.join().expect("Joining the http:server thread works on drop");
}
}
}
#[cfg(feature = "rouille")]
#[derive(Clone)]
struct VHttpServer {
srv: Rc<RefCell<HttpServer>>,
}
#[cfg(feature = "rouille")]
macro_rules! assert_arg_count {
($self: expr, $argv: expr, $count: expr, $function: expr, $env: ident) => {
if $argv.len() != $count {
return Err(StackAction::panic_str(
format!("{}.{} expects {} arguments", $self, $function, $count),
None,
$env.argv(),
));
}
};
}
#[cfg(feature = "rouille")]
fn handle_request(
env: &mut Env,
fun: VVal,
pend_req: PendingHttpRequest,
pending_response: Arc<PendingResult>,
) -> Result<VVal, StackAction> {
let req = VVal::map3(
"url",
VVal::new_str_mv(pend_req.url),
"method",
VVal::new_str_mv(pend_req.method),
"body",
VVal::new_byt(pend_req.body),
);
match fun.call(env, &[req.clone()]) {
Ok(val) => match pending_response.send(&val) {
Ok(()) => Ok(req),
Err(e) => {
Ok(env.new_err(format!("http:server:try_respond error on responding: {}", e)))
}
},
Err(StackAction::Return(val)) => match pending_response.send(&val.1) {
Ok(()) => Ok(req),
Err(e) => {
Ok(env.new_err(format!("http:server:try_respond error on responding: {}", e)))
}
},
Err(StackAction::Break(val)) => {
let _ = pending_response.send(&VVal::None);
Ok(val.as_ref().clone())
}
Err(StackAction::Next) => {
let _ = pending_response.send(&VVal::None);
Ok(VVal::None)
}
Err(panic) => Err(panic),
}
}
#[cfg(feature = "rouille")]
impl VValUserData for VHttpServer {
fn s(&self) -> String {
format!("$<HttpServer>")
}
fn as_any(&mut self) -> &mut dyn std::any::Any {
self
}
fn call_method(&self, key: &str, env: &mut Env) -> Result<VVal, StackAction> {
let argv = env.argv();
match key {
"wait_respond" => {
assert_arg_count!(
"$<HttpServer>",
argv,
1,
"wait_respond[responder_function]",
env
);
match self.srv.borrow_mut().request_receiver.recv() {
Ok((pend_req, pending_response)) => {
return handle_request(env, argv.v_(0), pend_req, pending_response);
}
Err(e) => Err(StackAction::panic_str(
format!("$<HttpServer> error waiting for request: {}", e),
None,
env.argv(),
)),
}
}
"timeout_respond" => {
assert_arg_count!(
"$<HttpServer>",
argv,
2,
"timeout_respond[duration, responder_function]",
env
);
let dur = match argv.v_(0).to_duration() {
Ok(dur) => dur,
Err(e) => return Ok(e),
};
match self.srv.borrow_mut().request_receiver.recv_timeout(dur) {
Ok((pend_req, pending_response)) => {
return handle_request(env, argv.v_(1), pend_req, pending_response);
}
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
Err(StackAction::panic_str(
format!("$<HttpServer> server request thread not running anymore!"),
None,
env.argv(),
))
}
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => Ok(VVal::None),
}
}
"try_respond" => {
assert_arg_count!("$<HttpServer>", argv, 1, "try_respond[responder_function]", env);
match self.srv.borrow_mut().request_receiver.try_recv() {
Ok((pend_req, pending_response)) => {
return handle_request(env, argv.v_(0), pend_req, pending_response);
}
Err(std::sync::mpsc::TryRecvError::Disconnected) => {
Err(StackAction::panic_str(
format!("$<HttpServer> server request thread not running anymore!"),
None,
env.argv(),
))
}
Err(std::sync::mpsc::TryRecvError::Empty) => Ok(VVal::None),
}
}
_ => Err(StackAction::panic_str(
format!("$<HttpServer> unknown method called: {}", key),
None,
env.argv(),
)),
}
}
fn clone_ud(&self) -> Box<dyn VValUserData> {
Box::new(self.clone())
}
}
#[allow(unused_variables)]
pub fn add_to_symtable(st: &mut SymbolTable) {
#[cfg(feature = "reqwest")]
@ -229,6 +509,20 @@ pub fn add_to_symtable(st: &mut SymbolTable) {
false,
);
#[cfg(feature = "rouille")]
st.fun(
"http:server:new",
|env: &mut Env, _argc: usize| match HttpServer::start(
&env.arg(0).s_raw()
) {
Ok(srv) => Ok(VVal::new_usr(VHttpServer { srv: Rc::new(RefCell::new(srv)) })),
Err(e) => Ok(env.new_err(format!("http:server:new Error: {}", e))),
},
Some(1),
Some(1),
false,
);
#[cfg(feature = "reqwest")]
st.fun(
"http:get",

2
src/stdlib/mod.rs

@ -9,6 +9,8 @@ mod cursive;
mod odbc;
mod util;
use super::compiler::*;
mod helpers;
pub use helpers::PendingResult;
pub fn add_to_symtable(st: &mut SymbolTable) {
net::add_to_symtable(st);

43
src/stdlib/odbc.rs

@ -17,6 +17,8 @@ use std::collections::HashMap;
use std::rc::Rc;
#[allow(unused_imports)]
use std::sync::{Arc, Condvar, Mutex};
#[allow(unused_imports)]
use super::PendingResult;
#[cfg(feature = "odbc")]
use odbc_api::{
@ -28,46 +30,6 @@ use odbc_api::{
Connection, Cursor, Environment, IntoParameter, ResultSetMetadata, U16String,
};
#[allow(dead_code)]
struct PendingResult {
lock: Mutex<(bool, AVal)>,
cvar: Condvar,
}
impl PendingResult {
pub fn new() -> Self {
Self { lock: Mutex::new((true, AVal::None)), cvar: Condvar::new() }
}
pub fn send(&self, res: &VVal) -> Result<(), String> {
match self.lock.lock() {
Ok(mut pend) => {
pend.0 = false;
pend.1 = AVal::from_vval(&res);
self.cvar.notify_one();
Ok(())
}
Err(e) => Err(format!("ODBC thread send error: {}", e)),
}
}
pub fn wait(&self) -> Result<VVal, String> {
let lock = match self.lock.lock() {
Ok(lock) => lock,
Err(e) => {
return Err(format!("ODBC thread lock error: {}", e));
}
};
match self.cvar.wait_while(lock, |pend| pend.0) {
Ok(pend) => Ok(pend.1.to_vval()),
Err(e) => {
return Err(format!("ODBC thread wait error: {}", e));
}
}
}
}
#[derive(Debug, Clone)]
enum WParam {
Str(String),
@ -96,6 +58,7 @@ impl WParam {
}
}
#[cfg(feature = "odbc")]
impl IntoParameter for WParam {
type Parameter = Box<dyn InputParameter>;

Loading…
Cancel
Save