rust 支持websocket的cgi

特点

  • 实现了 websocket 通讯,勉强能用

代码如下

use std::collections::HashMap;
use std::io;
use std::io::prelude::*;
use std::io::{BufReader, BufWriter};
use std::net::{TcpListener, TcpStream};
use std::process::{Command, Stdio};
use std::sync::Arc;
use std::sync::RwLock;

use clap::{App, Arg};

use base64::encode;
use sha1::{Digest, Sha1};
#[macro_use]
extern crate log;
#[macro_use]
extern crate lazy_static;

struct Req {
    // _handle_script: RwLock<Option<std::process::Child>>,
    _stream_closed: RwLock<bool>,
    _stream: TcpStream,
    headers: HashMap<String, String>,
    reader: RwLock<BufReader<TcpStream>>,
    writer: RwLock<BufWriter<TcpStream>>,
}

impl Req {
    //fn new(reader: &mut BufReader<TcpStream>, writer: &mut BufWriter<TcpStream>) -> Req {
    fn new(stream: TcpStream) -> Req {
        let mut headers: HashMap<String, String> = HashMap::new();
        let mut reader = BufReader::new(stream.try_clone().expect("open readerBuf erro"));
        let mut writer = BufWriter::new(stream.try_clone().expect("open writerBuf erro"));
        //let mut script_handle: Option<std::process::Child> = None;

        let mut buffer = String::new(); //Vec::with_capacity(1024);
        if let Ok(_) = reader.read_line(&mut buffer) {
            let line = buffer.trim_matches(|c| c == '\n' || c == '\r');
            let mut req = line.splitn(3, " ");
            if let Some(req_method) = req.next() {
                headers.insert(String::from("req_method"), String::from(req_method));
            }
            if let Some(req_path) = req.next() {
                let mut path = req_path.splitn(2, "?");
                if let Some(req_path) = path.next() {
                    headers.insert(String::from("req_path"), String::from(req_path));
                }
                if let Some(req_param) = path.next() {
                    headers.insert(String::from("req_param"), String::from(req_param));
                    // get param
                    let mut param = req_param.split("&");
                    while let Some(req_param_item) = param.next() {
                        let mut req_item_kv = req_param_item.splitn(2, "=");
                        if let Some(req_param_name) = req_item_kv.next() {
                            if let Some(req_param_value) = req_item_kv.next() {
                                headers.insert(
                                    format!("req_param_{}", req_param_name),
                                    String::from(req_param_value),
                                );
                            }
                        }
                    }
                }
                headers.insert(String::from("req_body_method"), "HTTP".to_string());
            }
            if let Some(req_version) = req.next() {
                headers.insert(String::from("req_version"), String::from(req_version));
            }
            // Read Header
            buffer.clear();
            while let Ok(_) = reader.read_line(&mut buffer) {
                //println!("> [{}]", buffer);
                let line = buffer.trim_matches(|c| c == '\n' || c == '\r');
                if line.is_empty() {
                    break;
                }
                let mut head = line.splitn(2, ":");
                if let Some(req_head_name) = head.next() {
                    if let Some(req_head_value) = head.next() {
                        headers.insert(
                            String::from(req_head_name),
                            String::from(String::from(req_head_value).trim_start()),
                        );
                    }
                }
                buffer.clear();
            }
            debug!("req header {:?}", headers);
            // handle websocket
            if let Some(upgrade) = headers.get("Upgrade") {
                if upgrade.to_lowercase() == "websocket" {
                    debug!("Req upgrade to Websocket");
                    if let Some(sec_websocket_key) = headers.get("Sec-WebSocket-Key") {
                        let mut hasher = Sha1::new();
                        hasher.update(format!(
                            "{}258EAFA5-E914-47DA-95CA-C5AB0DC85B11",
                            sec_websocket_key
                        ));
                        let sha1_key = hasher.finalize();
                        let sec_websocket_accept = encode(sha1_key);
                        // switch resp
                        let resp = format!("HTTP/1.1 101 SWITCH\r\nServer: Hawk web\r\nConnection: upgrade\r\nUpgrade: websocket\r\nSec-WebSocket-Accept: {}\r\n\r\n",sec_websocket_accept);
                        if let Ok(_) = writer.write(resp.as_bytes()) {
                            if let Ok(_) = writer.flush() {
                                //websocket
                                headers.insert(
                                    String::from("req_body_method"),
                                    "WEBSOCKET".to_string(),
                                );
                                debug!("Websocket handshake finished");
                            }
                        }
                    }
                }
            } // handle script
        }
        return Req {
            //_stream: RwLock::new(stream),
            _stream: stream,
            _stream_closed: RwLock::new(false),
            // _handle_script: RwLock::new(script_handle),
            headers: headers,
            reader: RwLock::new(reader),
            writer: RwLock::new(writer),
        };
    }

    fn get_current_target(&self) -> String {
        if let Some(req_path) = self.headers.get("req_path") {
            if cfg!(windows) {
                return String::from(req_path).replace("/", "\\");
            }
            //if cfg!(target_os = "macos" ) || cfg!(target_os = "linux") {
            return String::from(req_path);
        }
        return String::new();
    }

    fn read_from(&self, buffer: &mut Vec<u8>) -> io::Result<usize> {
        if let Some(method) = self.headers.get("req_body_method") {
            match method.as_str() {
                "WEBSOCKET" => return self.recv_websocket(buffer),
                "HTTP" => {
                    if let Some(length_s) = self.headers.get("Content-Length") {
                        if let Ok(length) = length_s.parse::<usize>() {
                            if length > 0 {
                                //let mut buffer = [0; 128];
                                buffer.resize(1024, 0);
                                return self.recv(buffer);
                            }
                        }
                    }
                    //return Ok(0);
                    return Err(io::Error::new(io::ErrorKind::Other, "No Content-Lengt"));
                }
                _ => {
                    return Err(io::Error::new(
                        io::ErrorKind::InvalidData,
                        format!("Unknow req_body_method {}", method),
                    ))
                }
            }
        }
        return Err(io::Error::new(
            io::ErrorKind::InvalidData,
            "No req_body_method",
        ));
    }

    fn send_to(&self, buffer: &[u8]) -> io::Result<usize> {
        if let Some(method) = self.headers.get("req_body_method") {
            match method.as_str() {
                "WEBSOCKET" => return self.send_websocket(0, buffer),
                "HTTP" => return self.send(buffer),
                _ => {
                    return Err(io::Error::new(
                        io::ErrorKind::InvalidData,
                        format!("Unknow req_body_method {}", method),
                    ))
                }
            }
        }
        return Err(io::Error::new(
            io::ErrorKind::InvalidData,
            "No req_body_method",
        ));
    }

    fn send(&self, data: &[u8]) -> io::Result<usize> {
        if self.is_closed() {
            return Err(io::Error::new(
                io::ErrorKind::BrokenPipe,
                "writer is closed",
            ));
        }
        if let Ok(mut writer) = self.writer.write() {
            return writer.write(data);
        }
        return Err(io::Error::new(
            io::ErrorKind::Other,
            "writer RwLock write Erro",
        ));
    }

    fn close(&self) -> io::Result<()> {
        if let Ok(mut closed) = self._stream_closed.write() {
            if *closed {
                return Ok(());
            }
            let _flush = self.flush();
            if let Ok(_) = _flush {
                let _shutdown = self._stream.shutdown(std::net::Shutdown::Both);
                if let Ok(_) = _shutdown {
                    *closed = true;
                }
                return _shutdown;
            }
            return _flush;
        }
        return Err(io::Error::new(
            io::ErrorKind::Other,
            "writer RwLock write Erro",
        ));
    }

    fn is_closed(&self) -> bool {
        if let Ok(closed) = self._stream_closed.read() {
            if *closed {
                return true;
            }
            return false;
        }
        return true;
    }

    fn recv(&self, data: &mut [u8]) -> io::Result<usize> {
        if self.is_closed() {
            return Err(io::Error::new(io::ErrorKind::BrokenPipe, "reader Erro"));
        }
        if let Ok(mut reader) = self.reader.write() {
            return reader.read(data);
        }
        return Err(io::Error::new(
            io::ErrorKind::Other,
            "reader RwLock write Erro",
        ));
    }

    fn flush(&self) -> io::Result<()> {
        if let Ok(mut writer) = self.writer.write() {
            return writer.flush();
        }
        return Err(io::Error::new(
            io::ErrorKind::Other,
            "writer RwLock write Erro",
        ));
    }

    fn send_websocket(&self, h1: u8, data: &[u8]) -> io::Result<usize> {
        let mut resp: Vec<u8> = Vec::new();
        let len = data.len();
        //h1 +fin+rsv1+rsv2+rsv3+opcode*4+
        //fin 1末尾包 0还有后续包
        //opcoce 4bit 0附加数据 1文本数据 2二进制数据 3-7保留为控制帧 8链接关闭 9ping apong b-f同3-7
        if h1 > 0 {
            resp.push(h1);
        } else {
            resp.push(0x81);
        }
        //h2 128 for mask bit
        if len < 126 {
            resp.push(len as u8);
        } else {
            if len > 125 && len < (1 << 16) {
                resp.push(126);
                // 2byte
                resp.push((len >> 8) as u8);
                resp.push(len as u8);
            } else {
                if len > (1 << 16) - 1 {
                    resp.push(127);
                    // 8byte
                    (0..8).for_each(|v| resp.push((len >> 8 * (7 - v)) as u8))
                }
            }
        }
        //mask
        //let _mask = [13u8, 9, 78, 108];
        //data
        return match self.send(resp.as_slice()) {
            Ok(_) => self.send(data),
            Err(e) => Err(e),
        };
    }

    //fn recv_websocket(&self,reader:&mut BufReader<TcpStream>,data:&mut Vec<u8>) -> Result<usize,usize>{
    fn recv_websocket(&self, data: &mut Vec<u8>) -> io::Result<usize> {
        //let mut load: Vec<u8> = Vec::new();
        let mut _mask = [0u8; 4];
        let mut _byte = [0u8; 1];
        //byte 1
        if let Ok(_) = self.recv(&mut _byte) {
            //println!(" > websocket byte one {:b}", _byte[0]);
            if 0b10001000 == _byte[0] {
                // ctrl close
                return Ok(0);
            }
            if 0b10001001 == _byte[0] {
                // ctrl ping 0b1000-1010
                self.send_websocket(0b10001010, b"pong").unwrap();
            }
            //byte 2
            if let Ok(_) = self.recv(&mut _byte) {
                //println!(" websocket fram byte 2 {:b}", _byte[0]);
                let _length = match _byte[0] & 0x7f {
                    n if n < 126 => n as usize,
                    n if n == 126 => {
                        //2byte
                        (0..2).fold(0usize, |a, v| {
                            while let Ok(_) = self.recv(&mut _byte) {
                                return a + (_byte[0] as usize) << 8 * (1 - v);
                            }
                            return a;
                        })
                    }
                    n if n == 127 => {
                        //8byte
                        (0..8).fold(0usize, |a, v| {
                            if let Ok(_) = self.recv(&mut _byte) {
                                return a + (_byte[0] as usize) << 8 * (7 - v);
                            }
                            return a;
                        })
                    }
                    _ => 0,
                };
                //println!("play load  len {}", _length);
                //mask 4byte
                if let Ok(_) = self.recv(&mut _mask) {
                    //println!("get mask {:?}", _mask);
                    //get playload
                    data.resize(_length, 0);
                    while let Ok(_) = self.recv(&mut data[.._length]) {
                        //unmask
                        for i in 0.._length {
                            data[i] = data[i] ^ _mask[i % 4];
                        }
                        return Ok(_length);
                    }
                }
            }
        }
        return Err(io::Error::new(
            io::ErrorKind::InvalidData,
            "websocket read erro",
        ));
    }
}

fn resp_404(req: &Req) {
    let mut body = String::from("HTTP/1.1 404 NOT FOUND\r\n");
    body.push_str("Server: Hawk\r\n");
    body.push_str("\r\n");
    req.send(body.as_bytes()).unwrap();
    if let Err(e) = req.flush() {
        error!("resp 404 flush erro {:?}", e);
    }
}

fn handle(stream: TcpStream) {
    info!("TcpStream handled");
    stream.set_nodelay(true).unwrap();
    //stream.set_read_timeout(Some(Duration::new(60, 0))).unwrap();
    //stream.set_write_timeout(Some(Duration::new(60, 0))).unwrap();
    if let Ok(peer_addr) = stream.peer_addr() {
        info!("Tcp From {}:{}", peer_addr.ip(), peer_addr.port());
        let req = Arc::new(Req::new(stream));
        // Read body

        //let script_spawn = script.spawn();
        if let Some(_) = req.headers.get("req_path") {
            let script_path = format!(".{}", req.get_current_target()); //format!("{}{}", WDIR.read().unwrap(), req.get_current_target());
            info!("Req [{}]", script_path);
            debug!("EXEC [{}]", script_path);
            let mut script = Command::new(&script_path);
            script.current_dir(WDIR.read().unwrap().as_str());
            script.env_clear();
            script
                .envs(&req.headers)
                .stdin(Stdio::piped())
                .stdout(Stdio::piped());
            //
            match script.spawn() {
                Ok(mut child) => {
                    let mut _req_stdin = req.clone();
                    let mut _req_stdout = req.clone();

                    let script_stdin = child.stdin.take();
                    let script_stdout = child.stdout.take(); //= child.stdout;                  //

                    let _stdout_thread = std::thread::spawn(move || {
                        if let Some(mut stdout) = script_stdout {
                            let mut buffer = [0; 128];
                            while let Ok(len) = stdout.read(&mut buffer) {
                                debug!("script stdout read len [{}] [{:?}]", len, String::from_utf8_lossy(&buffer[..len]));
                                if len > 0 {
                                    if let Err(e) = _req_stdout.send_to(&buffer[..len]) {
                                        error!("script stdout send_to {:?}; break", e);
                                        break;
                                    }
                                    if let Err(e) = _req_stdout.flush() {
                                        error!("script stdout read {:?}; break", e);
                                        break;
                                    }
                                } else {
                                    debug!("script stdout read data len 0; break");
                                    break;
                                }
                            }
                        }
                        debug!("script stdout read thread end");
                        debug!("close the tcpStream");
                        //std::thread::sleep(std::time::Duration::new(2,0));
                        if let Err(e) = _req_stdout.close() {
                            error!("script stdout close the tcpStream {:?}; break", e);
                        }
                    });
                    //
                    if let Some(mut stdin) = script_stdin {
                        let mut buffer = Vec::new();
                        //let _read = _req_stdin.read_from(&mut buffer);
                        while let Ok(len) = _req_stdin.read_from(&mut buffer) {
                            //debug!("tcpStream read len [{}]", len);
                            if len > 0 {
                                if let Err(e) = stdin.write(&buffer[..len]) {
                                    error!("script stdin write {:?} break", e);
                                    break;
                                }
                                if let Err(e) = stdin.flush() {
                                    error!("script stdin flush {:?}; break", e);
                                    break;
                                }
                            } else {
                                debug!("tcpstream read data len 0; break");
                                break;
                            }
                        }
                    }
                    // wait thread
                    if let Some(method) = _req_stdin.headers.get("req_body_method") {
                        if method == "HTTP" {
                            if let Err(e) = _stdout_thread.join() {
                                error!("script stdout read thread join erro {:?}", e)
                            }
                        }
                    }
                    //kill spawn
                    if let Err(e) = child.kill() {
                        error!("script stdout thread kill erro {:?}", e)
                    } else {
                        debug!("script stdout thread kill done")
                    }
                }
                Err(e) => {
                    error!("script [{:?}] spawn erro {:?}", script_path, e);
                    resp_404(&req);
                }
            }
        }
        debug!("Req End");
        if let Err(e) = req.close() {
            error!("Tcpstream close erro {:?}", e)
        }
    } else {
        if let Err(e) = stream.shutdown(std::net::Shutdown::Both) {
            error!("Tcpstream close erro {:?}", e)
        }
    }
    info!(" > TcpStream End\n\n");
}

lazy_static! {
    static ref WDIR: RwLock<String> = RwLock::new(String::from("/tmp"));
}

fn main() {
    env_logger::init();
    let matches = App::new("A WebService Program")
        .version("1.0")
        .author("mi78108@live.com>")
        .arg(
            Arg::with_name("workdir")
                .short("w")
                .long("workdir")
                .help("www work dir")
                .takes_value(true),
        )
        .arg(
            Arg::with_name("addr")
                .short("l")
                .long("localaddr")
                .help("bind address")
                .takes_value(true),
        )
        .get_matches();

    if let Some(wd) = matches.value_of("workdir") {
        if let Ok(mut _wwd) = WDIR.write() {
            _wwd.clear();
            _wwd.push_str(wd);
        };
        info!("set workdir [{}]", wd);
    }

    let addr = match matches.value_of("addr") {
        Some(_addr) => _addr,
        None => "0.0.0.0:8080",
    };

    let listener = TcpListener::bind(addr).expect(format!("bind {} erro", addr).as_str());
    info!("Listen on [{}] Work in [{}]", addr, WDIR.read().unwrap());
    for stream in listener.incoming() {
        match stream {
            Ok(_stream) => {
                std::thread::spawn(move || handle(_stream));
                debug!("new Req thread started")
            }
            Err(e) => {
                error!("Tcp handle erro {:?}", e)
            }
        };
    }
}

配合脚本使用

*比如 index

#!/usr/bin/ruby
mark=%Q|<html>
<body>
  <title>hawk rust page</title>
  <div id="time"></div>
  <script>
    let tm = document.getElementById("time")
    setInterval(function () {
      tm.innerHTML = Date()
    }, 1000)
  </script>
</body>

</html>|

if ENV['req_body_method'] == 'WEBSOCKET'
  loop do
    m = STDIN.readline
    puts "recv: #{m}"
  end
else
  puts "HTTP/1.1 200 OK\r\n"
  puts "Content-Type: text/html;charset=utf-8\r\n"
  puts "\r\n"
  puts mark
end
  • 比如 file.rb
    简单实现了文件查看,下载,上传
#!/usr/bin/ruby
require 'pathname'

req_path=ENV['req_param_name']
_file = `file -i #{req_path}`.split(":")
file_name = _file[0]
file_mime = _file[1]


if file_mime.nil? or file_mime.include? 'cannot open'
  puts "HTTP/1.1 404 Not Found\r\n"
  return
end

puts "HTTP/1.1 200 OK\r\n"
if file_mime.include? 'inode/directory'
  if ENV['req_param_opt'] == 'upload'
    if ENV['req_method'].downcase == "get"
      puts "Content-Type: text/html; charset=utf-8\r\n"
      puts "\r\n\r\n"
      puts %Q|<html>
        <title>hawk rust upload file page of [#{req_path}]</title>
        <body>
         <div id="files">
          <form action="/file?name=#{req_path}&opt=upload" method="post" enctype="multipart/form-data">
              <input type="file" name="upload_file"></input>
              <input type="submit" value="Upload" />
          </form>
          </div>
          <script>
          </script>
        </body>
    </html>| 
    return
    end

    if ENV['req_method'].downcase == "post"
      file_len = ENV['Content-Length'].to_i
      write_len = 0
      File::open("/tmp/_upload","w") { |f|
          begin
           write_len += (f.write STDIN.read_nonblock 1024)
           redo
          rescue IO::EAGAINWaitReadable
            f.flush
            break if write_len == file_len
            retry
          rescue EOFError
            break
          end
      }
      puts "Content-Type: text/html; charset=utf-8\r\n"
      puts "\r\n\r\n"
      puts write_len
      return
    end
  end
  puts "Content-Type: text/html; charset=utf-8\r\n"
  puts "\r\n\r\n"
  puts %Q|<html>
  <title>hawk rust file page of [#{req_path}]</title>
  <body>
    <div id="files">
    <div><a href='/file?name=#{Pathname::new(req_path).parent.to_s}' >..</a></div>
    #{`ls #{req_path}`.split("\n").map{|v| "<div><a href='/file?name=#{req_path}/#{v}' >#{v}</a></div>"}.join(" ")}
    </div>
    <script>
    </script>
  </body>
  </html>|
  return
end

if ENV['req_param_opt'] == 'download'
  _body = "#{`cat #{file_name}`}"
  puts "Content-Type: application/download\r\n"
  puts "Content-Disposition: attachment; filename=#{file_name}\r\n\r\n"
  puts "Content-Length: #{_body.length}\r\n\r\n"
  puts _body
  return
end

puts "Content-Type:#{file_mime}\r\n\r\n"
puts "#{`cat #{file_name}`}"
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,547评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,399评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,428评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,599评论 1 274
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,612评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,577评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,941评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,603评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,852评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,605评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,693评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,375评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,955评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,936评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,172评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,970评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,414评论 2 342

推荐阅读更多精彩内容