rustdesk 深度剖析
背景
由于公司需求远程桌面功能,和作者做了一些沟通,但作者比较忙也没怎么回复我的问题,所以以自己个人能力能去了解一下最近比较火的远程桌面技术 rustdesk 项目
但是整体粗略看下来(个人评价,不代表大众),实现的代码比较像 C++,整体给我的感觉比较乱,而且嵌套很严重,整体面向对象的理念有些偏差,基本上没有任何备注,整体看下来困难重重,本身 rust 特性就非常多,作者也补充了很多,也让我学习了很多,但是真心希望代码能多多优化一下,确实能看出来,时间很紧,也比较体谅作者,后续也希望自己能一起维护下社区代码
中继器目前没有开放出来,开源的 demo 版本很简单,就是告诉大家实现的主流程,具体性能优化需要具体问题具体分析,逻辑的实现其实并不复杂,可以自己来补充,比如针对远程桌面需要实现的远程调用指令以及功能抽象
整体实现逻辑(不对的请提出,大家一起进步)
其实整体看下来,大概后端服务会启动两个,一个负责 config 配置的管理,通过 unix 套接字 进行交互获取配置以及发送指令
另一个服务的话通过 unix 套接字 socket 进行交互管理配置,服务会初始化,并且会启动 音频、视频、剪切板、输入等功能服务的启动操作,并初始化对外的 server 连接并接收来自客户端的连接请求
client 端就是连接上面讲到的 server 的代码逻辑实现, ui.rs 是页面的具体逻辑实现抽象,并调用 client.rs 的相关功能
rustdesk 启动源码分析
main 函数分析
[features]
inline = []
cli = []
- 定义了 cfg features 功能,测功能需要执行以下参数选择编译分支:
cargo build --features="inline"
cargo build --features="cli"
- 生成的二进制文件
ls -als target/debug/odontoceti
- 由于用了 cfg 的 features 所以在编译时会选择匹配的编译分支
- 如果匹配 系统为 android 和 ios 会编译,<u>但是很可惜,这个功能并没有支持</u>
fn main() {
common::test_rendezvous_server();
common::test_nat_type();
#[cfg(target_os = "android")]
crate::common::check_software_update();
mobile::Session::start("");
}
- 如果不匹配 android 和 ios 以及 feature = "cli",这个也是默认以及完全实现了的启动方式
先获取参数:
let mut args = Vec::new();
let mut i = 0;
for arg in std::env::args() {
if i > 0 {
args.push(arg);
}
i += 1;
}
如果第一个参数等于 --version 输出 版本信息:
println!("{}", crate::VERSION);
return;
... ... 忽略旁支
如果参数为空,也是默认的启动项:
// 启动线程
if args.is_empty() {
std::thread::spawn(move || start_server(false, false));
... ...
// 启动 web 服务
ui::start(&mut args[..]);
server 端启动流程分析
- 启动服务 start_server,先确认是否有已经启动的服务端
- 默认无参数并且没有服务端变量
// 默认没有参数会进入这个分支逻辑
else {
// 连接 ipc unix 已经存在的套接字,并且已经有了服务端
// 默认在 /tmp/rustdesk
match crate::ipc::connect(1000, "").await {
返回 result 类型
Ok(mut conn) => {
// 先通过 unix 套接字 发送给服务端一个空的请求尝试下是否可以正常通信
allow_err!(conn.send(&Data::SystemInfo(None)).await);
// 请求配置信息,利用 inner 迭代器获取数据,超时1秒
if let Ok(Some(data)) = conn.next_timeout(1000).await {
log::info!("server info: {:?}", data);
}
// 同步 秘钥 配置,异步监听
// 确认秘钥是否为最新的
if Config::get_key_confirmed() {
... ...
} else {
// 更新秘钥 pair 数据到共享配置
if let Ok(Some(Data::ConfirmedKey(Some(pair)))) =
conn.next_timeout(1000).await
{
Config::set_key_pair(pair);
Config::set_key_confirmed(true);
log::info!("key pair synced");
break;
}
}
}
// 捕获 match 错误,证明没有服务端已经启用,所以执行线程启动服务指令
Err(err) => {
log::info!("{}", err);
std::thread::spawn(|| start_server(true, false));
}
// 实例化 ServerPtr 结构 服务端
pub fn new() -> ServerPtr {
let mut server = Server {
connections: HashMap::new(),
services: HashMap::new(),
id_count: 0,
};
server.add_service(Box::new(audio_service::new()));
server.add_service(Box::new(video_service::new()));
server.add_service(Box::new(clipboard_service::new()));
server.add_service(Box::new(input_service::new_cursor()));
server.add_service(Box::new(input_service::new_pos()));
Arc::new(RwLock::new(server))
}
- 执行真正的初次服务端启动
- 此时 is_server 参数已经修改为 true
// 定义 cfg 宏,target_os 为 linux 执行以下代码,获取变量值
#[cfg(target_os = "linux")]
{
// 在 Linux/Unix 类操作系统上,
// DISPLAY 用来设置将图形显示到何处,
// 直接登陆图形界面或者登陆命令行界面后使用 startx 启动图形
log::info!("DISPLAY={:?}", std::env::var("DISPLAY"));
// 该文件用于将凭据存储在用于 xauthX 会话身份验证的 cookie 中,
// XAUTHORITY 路径 用于验证与该特定显示器的连接
log::info!("XAUTHORITY={:?}", std::env::var("XAUTHORITY"));
}
// 如果 is_server 为 true 执行这段逻辑
if is_server {
// 启动线程,移动 引用所有权 到 闭包内
std::thread::spawn(move || {
// 如果启动时报错退出,调用 unix panic 返回 unix 状态码
if let Err(err) = crate::ipc::start("") {
log::error!("Failed to start ipc: {}", err);
std::process::exit(-1);
}
});
// TODO
crate::RendezvousMediator::start_all().await;
}
ipc 服务代码流程分析
- ipc.rs start 流程分析
// tokio::main 宏 使用 Tokio 默认的单线程运行时
// current_thread runtime flavor 是一个轻量的、单线程 Runtime
// 在只需要创建少量任务并且处理少量套接字的情况下,他是一个不错的选择
// 比如为客户端的异步函数提供一个同步接口的桥梁时,他就能工作的很好
#[tokio::main(flavor = "current_thread")]
pub async fn start(postfix: &str) -> ResultType<()> {
// 新建 套接字 listener
let mut incoming = new_listener(postfix).await?;
loop {
// accept 函数主要用于服务器端
// 初始化 listen 之后,默认会阻塞进程,
// 直到有一个客户请求连接,建立好连接后
// 它返回的一个新的套接字 socketfd_new
// 此后,服务器端即可使用这个新的套接字 socketfd_new 与该客户端进行通信
// 而 sockfd 则继续用于监听其他客户端的连接请求
// accept 返回句柄信息(源IP、源端口号、目的IP、目的端口号)
// incoming.next() 调用 self.socket.accept().await 返回 Result stream 信息
if let Some(result) = incoming.next().await {
match result {
Ok(stream) => {
// 初始化 tokio framed 连接
// Tokio 帮助函数将字节流转换为帧流
// 字节流的例子包括 TCP 连接,管道,文件对象以及标准输入和输出
let mut stream = Connection::new(stream);
let postfix = postfix.to_owned();
// 创建 tokio 线程,迁移所有权到闭包
tokio::spawn(async move {
loop {
// stream 迭代器 Result 类型
match stream.next().await {
Err(err) => {
log::trace!("ipc{} connection closed: {}", postfix, err);
break;
}
Ok(Some(data)) => {
// 服务端处理 stream 数据逻辑
handle(data, &mut stream).await;
}
_ => {}
}
}
});
}
Err(err) => {
log::error!("Couldn't get client: {:?}", err);
}
}
}
}
}
- ipc.rs new_listener 流程分析
// 创建 套接字
// postfix 直接写死在上一级,直接就是空的
pub async fn new_listener(postfix: &str) -> ResultType<Incoming> {
// #[cfg(not(windows))] 如果不是 windows 给予默认的创建 套接字 路径地址
// format!("/tmp/{}", APP_NAME).into();
// 直接(创建目录、设置权限、属主属组、格式化) 并检查是否创建成功,否则报错
let path = Config::ipc_path(postfix);
#[cfg(not(windows))]
// pid
// get_pid_file 创建 pid 文件,并且返回 pid 文件路径 string
// pid 检查其正确性
// 拿到 pid 进程号并创建,以及检查是否已经存在以及创建是否成功
// 检查 pid 进程的属主是否和当前启动 rustdesk 进程属主一致
// pid 进程的属主如果不一致再次尝试检查连接
// 服务关闭后,或者有问题,删除 pid
check_pid(postfix).await;
// 复制 unix 套接字的 path 栈引用
// 初始化 endpoint 结构
let mut endpoint = Endpoint::new(path.clone());
// 属性安全设置
match SecurityAttributes::allow_everyone_create() {
Ok(attr) => endpoint.set_security_attributes(attr),
Err(err) => log::error!("Failed to set ipc{} security: {}", postfix, err),
};
// 设置 unix 套接字目录的权限
// Result 类型 match 值
match endpoint.incoming() {
Ok(incoming) => {
log::info!("Started ipc{} server at path: {}", postfix, &path);
#[cfg(not(windows))]
{
// 设置目录权限,并把 pid 进程号写入 pid 文件
// 并且把 postfix 名字追加到 pid 文件名上
use std::os::unix::fs::PermissionsExt;
std::fs::set_permissions(&path, std::fs::Permissions::from_mode(0o0777)).ok();
write_pid(postfix);
}
Ok(incoming)
}
// 错误类型的 直接输出 没啥东西
Err(err) => {
log::error!(
"Faild to start ipc{} server at path {}: {}",
postfix,
path,
err
);
Err(err.into())
}
}
}
- ipc.rs handle 流程分析
// handle 主要功能是通过 unix 套接字连接过来的客户端
// 需要请求的 配置,并由 ipc 服务 send 给客户端所需要的配置
// Data 是所有配置的数据 rust 枚举 类型,这个是服务端返回给客户端的配置数据
// stream 是所需要发送的 connection 的实例化
async fn handle(data: Data, stream: &mut Connection) {
match data {
// 如果是需要 Data::SystemInfo(_) 类型的数据
// 那就 send 共享的数据给客户端
Data::SystemInfo(_) => {
let info = format!(
"log_path: {}, config: {}, username: {}",
Config::log_path().to_str().unwrap_or(""),
Config::file().to_str().unwrap_or(""),
crate::username(),
);
allow_err!(stream.send(&Data::SystemInfo(Some(info))).await);
}
// 如果是 close 指令数据,直接关闭服务,退出 返回 unix exit 状态码
Data::Close => {
log::info!("Receive close message");
std::process::exit(0);
}
// 这里会获取 中继器的 地址以及延迟
// config::ONLINE 这个配置会在 启动
// crate::RendezvousMediator::start_all().await 时
// 会初始化所有配置数据,包括 hashmap{"中继器地址", "延迟时间"}
// 之后会在这里看到配置信息,更新全局配置并通过 unix 套接字 发送给 其他客户端
Data::OnlineStatus(_) => {
let x = config::ONLINE
.lock()
.unwrap()
.values()
.max()
.unwrap_or(&0)
.clone();
let confirmed = Config::get_key_confirmed();
// 配置秘钥信息以及中继器 服务信息 全部发送给 其他客户端
allow_err!(stream.send(&Data::OnlineStatus(Some((x, confirmed)))).await);
}
Data::ConfirmedKey(None) => {
let out = if Config::get_key_confirmed() {
Some(Config::get_key_pair())
} else {
None
};
// 自己的秘钥信息 发送给 其他客户端
allow_err!(stream.send(&Data::ConfirmedKey(out)).await);
}
// 捕获 name、value 数据, 按照 value Option 类型进行判定
Data::Config((name, value)) => match value {
// 如果 value 为 None
None => {
let value;
if name == "id" {
// 获取 id
value = Some(Config::get_id());
// 获取密码
} else if name == "password" {
value = Some(Config::get_password());
// 获取 salt 约定秘钥
} else if name == "salt" {
value = Some(Config::get_salt());
// 获取 中继器服务 地址
} else if name == "rendezvous_server" {
value = Some(Config::get_rendezvous_server().to_string());
} else {
value = None;
}
allow_err!(stream.send(&Data::Config((name, value))).await);
}
// 如果存在 value 数据
Some(value) => {
// 判定 key 为 id 插入数据到 value
if name == "id" {
Config::set_id(&value);
// 判定 key 为 password 插入数据到 value
} else if name == "password" {
Config::set_password(&value);
// 判定 key 为 salt 插入数据到 value
} else if name == "salt" {
Config::set_salt(&value);
} else {
return;
}
log::info!("{} updated", name);
}
},
// Option 主要是通过发送 参数指令 告诉 服务端
// 停止服务,或其他指令(目前未看到实现)
Data::Options(value) => match value {
None => {
let v = Config::get_options();
allow_err!(stream.send(&Data::Options(Some(v))).await);
}
Some(value) => {
Config::set_options(value);
}
},
// 网络连接 类型
// 目前有三种
// 默认采用中继器网络类型 SYMMETRIC
// 一共有两种类型 ASYMMETRIC、SYMMETRIC
Data::NatType(_) => {
let t = Config::get_nat_type();
allow_err!(stream.send(&Data::NatType(Some(t))).await);
}
_ => {}
}
}
对外启动聚合引擎服务代码分析
- crate::RendezvousMediator::start_all() 聚合服务流程分析
impl RendezvousMediator {
pub async fn start_all() {
let mut nat_tested = false;
// 检查当前的进程是否可以正常创建线程
// 如果正常创建 正常 调用 drop 休眠 100 毫秒
check_zombie();
// 初始化服务端
// 并增加所需要启动的子服务
// 初始化并启动 audio_service 音频服务
// 初始化并启动 video_service 视频服务
// 初始化并启动 clipboard_service 剪切板服务
// 初始化并启动 input_service -> MouseCursorService 鼠标服务
// 初始化并启动 input_service -> GenericService 通用服务
let server = new_server();
// 当前的网络模式是否是未知的
// 如果是未知网络就进行 test_nat_type
// 判断当前网络是什么类型的(公共中继器,还是直接做穿透)这块的理解不一定准确
if Config::get_nat_type() == NatType::UNKNOWN_NAT as i32 {
crate::common::test_nat_type();
nat_tested = true;
}
loop {
// 重置 ONLINE HashMap 共享的连接信息配置信息
Config::reset_online();
// 如果配置选项里的发送指令 stop-service 为空
if Config::get_option("stop-service").is_empty() {
if !nat_tested {
crate::common::test_nat_type();
nat_tested = true;
}
let mut futs = Vec::new();
// 获取默认中继器的配置信息,默认是 rustdesk 的香港一组服务器
// ["rs-ny.rustdesk.com", "rs-sg.rustdesk.com", "rs-cn.rustdesk.com"]
let servers = Config::get_rendezvous_servers();
for host in servers.clone() {
let server = server.clone();
let servers = servers.clone();
// 创建线程
futs.push(tokio::spawn(async move {
allow_err!(Self::start(server, host, servers).await);
}));
}
// 等待所有线程停止
join_all(futs).await;
}
//
sleep(1.).await;
}
}
pub async fn start(
server: ServerPtr,
host: String,
rendezvous_servers: Vec<String>,
) -> ResultType<()> {
log::info!("start rendezvous mediator of {}", host);
let host_prefix: String = host
.split(".")
.next()
.map(|x| {
if x.parse::<i32>().is_ok() {
host.clone()
} else {
x.to_string()
}
})
.unwrap_or(host.to_owned());
let mut rz = Self {
addr: Config::get_any_listen_addr(),
host: host.clone(),
host_prefix,
rendezvous_servers,
last_id_pk_registery: "".to_owned(),
};
allow_err!(rz.dns_check());
// 通过 socket 连接服务端
let mut socket = FramedSocket::new(Config::get_any_listen_addr()).await?;
const TIMER_OUT: Duration = Duration::from_secs(1);
let mut timer = interval(TIMER_OUT);
let mut last_timer = SystemTime::UNIX_EPOCH;
const REG_INTERVAL: i64 = 12_000;
const REG_TIMEOUT: i64 = 3_000;
const MAX_FAILS1: i64 = 3;
const MAX_FAILS2: i64 = 6;
const DNS_INTERVAL: i64 = 60_000;
let mut fails = 0;
// 注册 unix 套接字 请求
let mut last_register_resp = SystemTime::UNIX_EPOCH;
// 注册 unix 套接字 发送端
let mut last_register_sent = SystemTime::UNIX_EPOCH;
let mut last_dns_check = SystemTime::UNIX_EPOCH;
let mut old_latency = 0;
let mut ema_latency = 0;
loop {
select! {
Some(Ok((bytes, _))) = socket.next() => {
if let Ok(msg_in) = Message::parse_from_bytes(&bytes) {
match msg_in.union {
Some(rendezvous_message::Union::register_peer_response(rpr)) => {
update_latency();
if rpr.request_pk {
log::info!("request_pk received from {}", host);
allow_err!(rz.register_pk(&mut socket).await);
continue;
}
}
Some(rendezvous_message::Union::register_pk_response(rpr)) => {
update_latency();
match rpr.result.enum_value_or_default() {
register_pk_response::Result::OK => {
Config::set_key_confirmed(true);
Config::set_host_key_confirmed(&rz.host_prefix, true);
*SOLVING_PK_MISMATCH.lock().unwrap() = "".to_owned();
}
register_pk_response::Result::UUID_MISMATCH => {
allow_err!(rz.handle_uuid_mismatch(&mut socket).await);
}
}
}
Some(rendezvous_message::Union::punch_hole(ph)) => {
let rz = rz.clone();
let server = server.clone();
tokio::spawn(async move {
allow_err!(rz.handle_punch_hole(ph, server).await);
});
}
Some(rendezvous_message::Union::request_relay(rr)) => {
let rz = rz.clone();
let server = server.clone();
tokio::spawn(async move {
allow_err!(rz.handle_request_relay(rr, server).await);
});
}
Some(rendezvous_message::Union::fetch_local_addr(fla)) => {
let rz = rz.clone();
let server = server.clone();
tokio::spawn(async move {
allow_err!(rz.handle_intranet(fla, server).await);
});
}
Some(rendezvous_message::Union::configure_update(cu)) => {
Config::set_option("rendezvous-servers".to_owned(), cu.rendezvous_servers.join(","));
Config::set_serial(cu.serial);
}
_ => {}
}
} else {
log::debug!("Non-protobuf message bytes received: {:?}", bytes);
}
},
_ = timer.tick() => {
if Config::get_rendezvous_servers() != rz.rendezvous_servers {
break;
}
if !Config::get_option("stop-service").is_empty() {
break;
}
if rz.addr.port() == 0 {
allow_err!(rz.dns_check());
if rz.addr.port() == 0 {
continue;
} else {
// have to do this for osx, to avoid "Can't assign requested address"
// when socket created before OS network ready
socket = FramedSocket::new(Config::get_any_listen_addr()).await?;
}
}
let now = SystemTime::now();
if now.duration_since(last_timer).map(|d| d < TIMER_OUT).unwrap_or(false) {
// a workaround of tokio timer bug
continue;
}
last_timer = now;
let elapsed_resp = now.duration_since(last_register_resp).map(|d| d.as_millis() as i64).unwrap_or(REG_INTERVAL);
let timeout = last_register_sent.duration_since(last_register_resp).map(|d| d.as_millis() as i64).unwrap_or(0) >= REG_TIMEOUT;
if timeout || elapsed_resp >= REG_INTERVAL {
allow_err!(rz.register_peer(&mut socket).await);
last_register_sent = now;
if timeout {
fails += 1;
if fails > MAX_FAILS2 {
Config::update_latency(&host, -1);
old_latency = 0;
if now.duration_since(last_dns_check).map(|d| d.as_millis() as i64).unwrap_or(0) > DNS_INTERVAL {
if let Ok(_) = rz.dns_check() {
// in some case of network reconnect (dial IP network),
// old UDP socket not work any more after network recover
socket = FramedSocket::new(Config::get_any_listen_addr()).await?;
}
last_dns_check = now;
}
} else if fails > MAX_FAILS1 {
Config::update_latency(&host, 0);
old_latency = 0;
}
}
}
}
}
}
Ok(())
}
}
待续.......