replace libs/hbb_common with submodule (#502)

cargo update -p schannel to fix crash on higher rust toolchain, https://github.com/seanmonstar/reqwest/issues/2311

Signed-off-by: 21pages <sunboeasy@gmail.com>
This commit is contained in:
21pages 2025-01-20 17:34:22 +08:00 committed by GitHub
parent 772db7422f
commit 7a509f6975
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
28 changed files with 1397 additions and 5357 deletions

View File

@ -41,6 +41,8 @@ jobs:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: recursive
- name: Install toolchain
uses: actions-rs/toolchain@v1
@ -80,6 +82,8 @@ jobs:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: recursive
- name: Install toolchain
uses: actions-rs/toolchain@v1
@ -222,6 +226,8 @@ jobs:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: recursive
- name: Download binaries
uses: actions/download-artifact@v3
@ -343,6 +349,8 @@ jobs:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: recursive
- name: Download binaries
uses: actions/download-artifact@v3
@ -462,7 +470,9 @@ jobs:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: recursive
- name: Set up QEMU
uses: docker/setup-qemu-action@v2

View File

@ -36,6 +36,8 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v4
with:
submodules: recursive
- name: Install Rust toolchain
uses: dtolnay/rust-toolchain@v1
@ -85,6 +87,8 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v4
with:
submodules: recursive
- name: Download binaries
uses: actions/download-artifact@v4
@ -216,6 +220,8 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v4
with:
submodules: recursive
- name: Download binaries
uses: actions/download-artifact@v4

View File

@ -20,6 +20,7 @@ jobs:
profile: minimal
toolchain: stable
override: true
submodules: recursive
- uses: Swatinem/rust-cache@v2
- uses: actions-rs/cargo@v1
with:
@ -34,6 +35,7 @@ jobs:
profile: minimal
toolchain: stable
override: true
submodules: recursive
- uses: Swatinem/rust-cache@v2
- uses: actions-rs/cargo@v1
with:
@ -50,6 +52,7 @@ jobs:
toolchain: stable
override: true
components: rustfmt
submodules: recursive
- uses: Swatinem/rust-cache@v2
- uses: actions-rs/cargo@v1
with:
@ -69,6 +72,7 @@ jobs:
toolchain: stable
override: true
components: clippy
submodules: recursive
- uses: Swatinem/rust-cache@v2
- uses: actions-rs/cargo@v1
with:

3
.gitmodules vendored Normal file
View File

@ -0,0 +1,3 @@
[submodule "libs/hbb_common"]
path = libs/hbb_common
url = https://github.com/rustdesk/hbb_common

1690
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -52,6 +52,13 @@ local-ip-address = "0.5.1"
dns-lookup = "1.0.8"
ping = "0.4.0"
[target.'cfg(any(target_os = "macos", target_os = "windows"))'.dependencies]
# https://github.com/rustdesk/rustdesk-server-pro/issues/189, using native-tls for better tls support
reqwest = { git = "https://github.com/rustdesk-org/reqwest", features = ["blocking", "socks", "json", "native-tls", "gzip"], default-features=false }
[target.'cfg(not(any(target_os = "macos", target_os = "windows")))'.dependencies]
reqwest = { git = "https://github.com/rustdesk-org/reqwest", features = ["blocking", "socks", "json", "rustls-tls", "rustls-tls-native-roots", "gzip"], default-features=false }
[build-dependencies]
hbb_common = { path = "libs/hbb_common" }

1
libs/hbb_common Submodule

@ -0,0 +1 @@
Subproject commit 49c6b24a7a8c39d4448e07b743007ef1a3febd43

View File

@ -1,4 +0,0 @@
/target
**/*.rs.bk
Cargo.lock
src/protos/

View File

@ -1,51 +0,0 @@
[package]
name = "hbb_common"
version = "0.1.0"
authors = ["open-trade <info@opentradesolutions.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
protobuf = { version = "3.1", features = ["with-bytes"] }
tokio = { version = "1.20", features = ["full"] }
tokio-util = { version = "0.7", features = ["full"] }
futures = "0.3"
bytes = { version = "1.2", features = ["serde"] }
log = "0.4"
env_logger = "0.9"
socket2 = { version = "0.3", features = ["reuseport"] }
zstd = "0.9"
quinn = {version = "0.8", optional = true }
anyhow = "1.0"
futures-util = "0.3"
directories-next = "2.0"
rand = "0.8"
serde_derive = "1.0"
serde = "1.0"
lazy_static = "1.4"
confy = { git = "https://github.com/open-trade/confy" }
dirs-next = "2.0"
filetime = "0.2"
sodiumoxide = "0.2"
regex = "1.4"
tokio-socks = { git = "https://github.com/open-trade/tokio-socks" }
chrono = "0.4"
[target.'cfg(not(any(target_os = "android", target_os = "ios")))'.dependencies]
mac_address = "1.1"
machine-uid = "0.2"
[features]
quic = []
flatpak = []
[build-dependencies]
protobuf-codegen = { version = "3.1" }
[target.'cfg(target_os = "windows")'.dependencies]
winapi = { version = "0.3", features = ["winuser"] }
[dev-dependencies]
toml = "0.5"
serde_json = "1.0"

View File

@ -1,14 +0,0 @@
fn main() {
let out_dir = format!("{}/protos", std::env::var("OUT_DIR").unwrap());
std::fs::create_dir_all(&out_dir).unwrap();
protobuf_codegen::Codegen::new()
.pure()
.out_dir(out_dir)
.inputs(["protos/rendezvous.proto", "protos/message.proto"])
.include("protos")
.customize(protobuf_codegen::Customize::default().tokio_bytes(true))
.run()
.expect("Codegen failed.");
}

View File

@ -1,638 +0,0 @@
syntax = "proto3";
package hbb;
message EncodedVideoFrame {
bytes data = 1;
bool key = 2;
int64 pts = 3;
}
message EncodedVideoFrames { repeated EncodedVideoFrame frames = 1; }
message RGB { bool compress = 1; }
// planes data send directly in binary for better use arraybuffer on web
message YUV {
bool compress = 1;
int32 stride = 2;
}
message VideoFrame {
oneof union {
EncodedVideoFrames vp9s = 6;
RGB rgb = 7;
YUV yuv = 8;
EncodedVideoFrames h264s = 10;
EncodedVideoFrames h265s = 11;
}
int64 timestamp = 9;
}
message IdPk {
string id = 1;
bytes pk = 2;
}
message DisplayInfo {
sint32 x = 1;
sint32 y = 2;
int32 width = 3;
int32 height = 4;
string name = 5;
bool online = 6;
bool cursor_embedded = 7;
}
message PortForward {
string host = 1;
int32 port = 2;
}
message FileTransfer {
string dir = 1;
bool show_hidden = 2;
}
message LoginRequest {
string username = 1;
bytes password = 2;
string my_id = 4;
string my_name = 5;
OptionMessage option = 6;
oneof union {
FileTransfer file_transfer = 7;
PortForward port_forward = 8;
}
bool video_ack_required = 9;
uint64 session_id = 10;
string version = 11;
}
message ChatMessage { string text = 1; }
message Features {
bool privacy_mode = 1;
}
message SupportedEncoding {
bool h264 = 1;
bool h265 = 2;
}
message PeerInfo {
string username = 1;
string hostname = 2;
string platform = 3;
repeated DisplayInfo displays = 4;
int32 current_display = 5;
bool sas_enabled = 6;
string version = 7;
int32 conn_id = 8;
Features features = 9;
SupportedEncoding encoding = 10;
}
message LoginResponse {
oneof union {
string error = 1;
PeerInfo peer_info = 2;
}
}
message MouseEvent {
int32 mask = 1;
sint32 x = 2;
sint32 y = 3;
repeated ControlKey modifiers = 4;
}
enum KeyboardMode{
Legacy = 0;
Map = 1;
Translate = 2;
Auto = 3;
}
enum ControlKey {
Unknown = 0;
Alt = 1;
Backspace = 2;
CapsLock = 3;
Control = 4;
Delete = 5;
DownArrow = 6;
End = 7;
Escape = 8;
F1 = 9;
F10 = 10;
F11 = 11;
F12 = 12;
F2 = 13;
F3 = 14;
F4 = 15;
F5 = 16;
F6 = 17;
F7 = 18;
F8 = 19;
F9 = 20;
Home = 21;
LeftArrow = 22;
/// meta key (also known as "windows"; "super"; and "command")
Meta = 23;
/// option key on macOS (alt key on Linux and Windows)
Option = 24; // deprecated, use Alt instead
PageDown = 25;
PageUp = 26;
Return = 27;
RightArrow = 28;
Shift = 29;
Space = 30;
Tab = 31;
UpArrow = 32;
Numpad0 = 33;
Numpad1 = 34;
Numpad2 = 35;
Numpad3 = 36;
Numpad4 = 37;
Numpad5 = 38;
Numpad6 = 39;
Numpad7 = 40;
Numpad8 = 41;
Numpad9 = 42;
Cancel = 43;
Clear = 44;
Menu = 45; // deprecated, use Alt instead
Pause = 46;
Kana = 47;
Hangul = 48;
Junja = 49;
Final = 50;
Hanja = 51;
Kanji = 52;
Convert = 53;
Select = 54;
Print = 55;
Execute = 56;
Snapshot = 57;
Insert = 58;
Help = 59;
Sleep = 60;
Separator = 61;
Scroll = 62;
NumLock = 63;
RWin = 64;
Apps = 65;
Multiply = 66;
Add = 67;
Subtract = 68;
Decimal = 69;
Divide = 70;
Equals = 71;
NumpadEnter = 72;
RShift = 73;
RControl = 74;
RAlt = 75;
CtrlAltDel = 100;
LockScreen = 101;
}
message KeyEvent {
bool down = 1;
bool press = 2;
oneof union {
ControlKey control_key = 3;
uint32 chr = 4;
uint32 unicode = 5;
string seq = 6;
}
repeated ControlKey modifiers = 8;
KeyboardMode mode = 9;
}
message CursorData {
uint64 id = 1;
sint32 hotx = 2;
sint32 hoty = 3;
int32 width = 4;
int32 height = 5;
bytes colors = 6;
}
message CursorPosition {
sint32 x = 1;
sint32 y = 2;
}
message Hash {
string salt = 1;
string challenge = 2;
}
message Clipboard {
bool compress = 1;
bytes content = 2;
}
enum FileType {
Dir = 0;
DirLink = 2;
DirDrive = 3;
File = 4;
FileLink = 5;
}
message FileEntry {
FileType entry_type = 1;
string name = 2;
bool is_hidden = 3;
uint64 size = 4;
uint64 modified_time = 5;
}
message FileDirectory {
int32 id = 1;
string path = 2;
repeated FileEntry entries = 3;
}
message ReadDir {
string path = 1;
bool include_hidden = 2;
}
message ReadAllFiles {
int32 id = 1;
string path = 2;
bool include_hidden = 3;
}
message FileAction {
oneof union {
ReadDir read_dir = 1;
FileTransferSendRequest send = 2;
FileTransferReceiveRequest receive = 3;
FileDirCreate create = 4;
FileRemoveDir remove_dir = 5;
FileRemoveFile remove_file = 6;
ReadAllFiles all_files = 7;
FileTransferCancel cancel = 8;
FileTransferSendConfirmRequest send_confirm = 9;
}
}
message FileTransferCancel { int32 id = 1; }
message FileResponse {
oneof union {
FileDirectory dir = 1;
FileTransferBlock block = 2;
FileTransferError error = 3;
FileTransferDone done = 4;
FileTransferDigest digest = 5;
}
}
message FileTransferDigest {
int32 id = 1;
sint32 file_num = 2;
uint64 last_modified = 3;
uint64 file_size = 4;
bool is_upload = 5;
}
message FileTransferBlock {
int32 id = 1;
sint32 file_num = 2;
bytes data = 3;
bool compressed = 4;
uint32 blk_id = 5;
}
message FileTransferError {
int32 id = 1;
string error = 2;
sint32 file_num = 3;
}
message FileTransferSendRequest {
int32 id = 1;
string path = 2;
bool include_hidden = 3;
int32 file_num = 4;
}
message FileTransferSendConfirmRequest {
int32 id = 1;
sint32 file_num = 2;
oneof union {
bool skip = 3;
uint32 offset_blk = 4;
}
}
message FileTransferDone {
int32 id = 1;
sint32 file_num = 2;
}
message FileTransferReceiveRequest {
int32 id = 1;
string path = 2; // path written to
repeated FileEntry files = 3;
int32 file_num = 4;
}
message FileRemoveDir {
int32 id = 1;
string path = 2;
bool recursive = 3;
}
message FileRemoveFile {
int32 id = 1;
string path = 2;
sint32 file_num = 3;
}
message FileDirCreate {
int32 id = 1;
string path = 2;
}
// main logic from freeRDP
message CliprdrMonitorReady {
}
message CliprdrFormat {
int32 id = 2;
string format = 3;
}
message CliprdrServerFormatList {
repeated CliprdrFormat formats = 2;
}
message CliprdrServerFormatListResponse {
int32 msg_flags = 2;
}
message CliprdrServerFormatDataRequest {
int32 requested_format_id = 2;
}
message CliprdrServerFormatDataResponse {
int32 msg_flags = 2;
bytes format_data = 3;
}
message CliprdrFileContentsRequest {
int32 stream_id = 2;
int32 list_index = 3;
int32 dw_flags = 4;
int32 n_position_low = 5;
int32 n_position_high = 6;
int32 cb_requested = 7;
bool have_clip_data_id = 8;
int32 clip_data_id = 9;
}
message CliprdrFileContentsResponse {
int32 msg_flags = 3;
int32 stream_id = 4;
bytes requested_data = 5;
}
message Cliprdr {
oneof union {
CliprdrMonitorReady ready = 1;
CliprdrServerFormatList format_list = 2;
CliprdrServerFormatListResponse format_list_response = 3;
CliprdrServerFormatDataRequest format_data_request = 4;
CliprdrServerFormatDataResponse format_data_response = 5;
CliprdrFileContentsRequest file_contents_request = 6;
CliprdrFileContentsResponse file_contents_response = 7;
}
}
message SwitchDisplay {
int32 display = 1;
sint32 x = 2;
sint32 y = 3;
int32 width = 4;
int32 height = 5;
bool cursor_embedded = 6;
}
message PermissionInfo {
enum Permission {
Keyboard = 0;
Clipboard = 2;
Audio = 3;
File = 4;
Restart = 5;
Recording = 6;
}
Permission permission = 1;
bool enabled = 2;
}
enum ImageQuality {
NotSet = 0;
Low = 2;
Balanced = 3;
Best = 4;
}
message VideoCodecState {
enum PreferCodec {
Auto = 0;
VPX = 1;
H264 = 2;
H265 = 3;
}
int32 score_vpx = 1;
int32 score_h264 = 2;
int32 score_h265 = 3;
PreferCodec prefer = 4;
}
message OptionMessage {
enum BoolOption {
NotSet = 0;
No = 1;
Yes = 2;
}
ImageQuality image_quality = 1;
BoolOption lock_after_session_end = 2;
BoolOption show_remote_cursor = 3;
BoolOption privacy_mode = 4;
BoolOption block_input = 5;
int32 custom_image_quality = 6;
BoolOption disable_audio = 7;
BoolOption disable_clipboard = 8;
BoolOption enable_file_transfer = 9;
VideoCodecState video_codec_state = 10;
int32 custom_fps = 11;
}
message TestDelay {
int64 time = 1;
bool from_client = 2;
uint32 last_delay = 3;
uint32 target_bitrate = 4;
}
message PublicKey {
bytes asymmetric_value = 1;
bytes symmetric_value = 2;
}
message SignedId { bytes id = 1; }
message AudioFormat {
uint32 sample_rate = 1;
uint32 channels = 2;
}
message AudioFrame {
bytes data = 1;
int64 timestamp = 2;
}
// Notify peer to show message box.
message MessageBox {
// Message type. Refer to flutter/lib/common.dart/msgBox().
string msgtype = 1;
string title = 2;
// English
string text = 3;
// If not empty, msgbox provides a button to following the link.
// The link here can't be directly http url.
// It must be the key of http url configed in peer side or "rustdesk://*" (jump in app).
string link = 4;
}
message BackNotification {
// no need to consider block input by someone else
enum BlockInputState {
BlkStateUnknown = 0;
BlkOnSucceeded = 2;
BlkOnFailed = 3;
BlkOffSucceeded = 4;
BlkOffFailed = 5;
}
enum PrivacyModeState {
PrvStateUnknown = 0;
// Privacy mode on by someone else
PrvOnByOther = 2;
// Privacy mode is not supported on the remote side
PrvNotSupported = 3;
// Privacy mode on by self
PrvOnSucceeded = 4;
// Privacy mode on by self, but denied
PrvOnFailedDenied = 5;
// Some plugins are not found
PrvOnFailedPlugin = 6;
// Privacy mode on by self, but failed
PrvOnFailed = 7;
// Privacy mode off by self
PrvOffSucceeded = 8;
// Ctrl + P
PrvOffByPeer = 9;
// Privacy mode off by self, but failed
PrvOffFailed = 10;
PrvOffUnknown = 11;
}
oneof union {
PrivacyModeState privacy_mode_state = 1;
BlockInputState block_input_state = 2;
}
}
message ElevationRequestWithLogon {
string username = 1;
string password = 2;
}
message ElevationRequest {
oneof union {
bool direct = 1;
ElevationRequestWithLogon logon = 2;
}
}
message SwitchSidesRequest {
bytes uuid = 1;
}
message SwitchSidesResponse {
bytes uuid = 1;
LoginRequest lr = 2;
}
message SwitchBack {}
message Misc {
oneof union {
ChatMessage chat_message = 4;
SwitchDisplay switch_display = 5;
PermissionInfo permission_info = 6;
OptionMessage option = 7;
AudioFormat audio_format = 8;
string close_reason = 9;
bool refresh_video = 10;
bool video_received = 12;
BackNotification back_notification = 13;
bool restart_remote_device = 14;
bool uac = 15;
bool foreground_window_elevated = 16;
bool stop_service = 17;
ElevationRequest elevation_request = 18;
string elevation_response = 19;
bool portable_service_running = 20;
SwitchSidesRequest switch_sides_request = 21;
SwitchBack switch_back = 22;
}
}
message VoiceCallRequest {
int64 req_timestamp = 1;
// Indicates whether the request is a connect action or a disconnect action.
bool is_connect = 2;
}
message VoiceCallResponse {
bool accepted = 1;
int64 req_timestamp = 2; // Should copy from [VoiceCallRequest::req_timestamp].
int64 ack_timestamp = 3;
}
message Message {
oneof union {
SignedId signed_id = 3;
PublicKey public_key = 4;
TestDelay test_delay = 5;
VideoFrame video_frame = 6;
LoginRequest login_request = 7;
LoginResponse login_response = 8;
Hash hash = 9;
MouseEvent mouse_event = 10;
AudioFrame audio_frame = 11;
CursorData cursor_data = 12;
CursorPosition cursor_position = 13;
uint64 cursor_id = 14;
KeyEvent key_event = 15;
Clipboard clipboard = 16;
FileAction file_action = 17;
FileResponse file_response = 18;
Misc misc = 19;
Cliprdr cliprdr = 20;
MessageBox message_box = 21;
SwitchSidesResponse switch_sides_response = 22;
VoiceCallRequest voice_call_request = 23;
VoiceCallResponse voice_call_response = 24;
}
}

View File

@ -1,182 +0,0 @@
syntax = "proto3";
package hbb;
message RegisterPeer {
string id = 1;
int32 serial = 2;
}
enum ConnType {
DEFAULT_CONN = 0;
FILE_TRANSFER = 1;
PORT_FORWARD = 2;
RDP = 3;
}
message RegisterPeerResponse { bool request_pk = 2; }
message PunchHoleRequest {
string id = 1;
NatType nat_type = 2;
string licence_key = 3;
ConnType conn_type = 4;
string token = 5;
}
message PunchHole {
bytes socket_addr = 1;
string relay_server = 2;
NatType nat_type = 3;
}
message TestNatRequest {
int32 serial = 1;
}
// per my test, uint/int has no difference in encoding, int not good for negative, use sint for negative
message TestNatResponse {
int32 port = 1;
ConfigUpdate cu = 2; // for mobile
}
enum NatType {
UNKNOWN_NAT = 0;
ASYMMETRIC = 1;
SYMMETRIC = 2;
}
message PunchHoleSent {
bytes socket_addr = 1;
string id = 2;
string relay_server = 3;
NatType nat_type = 4;
string version = 5;
}
message RegisterPk {
string id = 1;
bytes uuid = 2;
bytes pk = 3;
string old_id = 4;
}
message RegisterPkResponse {
enum Result {
OK = 0;
UUID_MISMATCH = 2;
ID_EXISTS = 3;
TOO_FREQUENT = 4;
INVALID_ID_FORMAT = 5;
NOT_SUPPORT = 6;
SERVER_ERROR = 7;
}
Result result = 1;
}
message PunchHoleResponse {
bytes socket_addr = 1;
bytes pk = 2;
enum Failure {
ID_NOT_EXIST = 0;
OFFLINE = 2;
LICENSE_MISMATCH = 3;
LICENSE_OVERUSE = 4;
}
Failure failure = 3;
string relay_server = 4;
oneof union {
NatType nat_type = 5;
bool is_local = 6;
}
string other_failure = 7;
}
message ConfigUpdate {
int32 serial = 1;
repeated string rendezvous_servers = 2;
}
message RequestRelay {
string id = 1;
string uuid = 2;
bytes socket_addr = 3;
string relay_server = 4;
bool secure = 5;
string licence_key = 6;
ConnType conn_type = 7;
string token = 8;
}
message RelayResponse {
bytes socket_addr = 1;
string uuid = 2;
string relay_server = 3;
oneof union {
string id = 4;
bytes pk = 5;
}
string refuse_reason = 6;
string version = 7;
}
message SoftwareUpdate { string url = 1; }
// if in same intranet, punch hole won't work both for udp and tcp,
// even some router has below connection error if we connect itself,
// { kind: Other, error: "could not resolve to any address" },
// so we request local address to connect.
message FetchLocalAddr {
bytes socket_addr = 1;
string relay_server = 2;
}
message LocalAddr {
bytes socket_addr = 1;
bytes local_addr = 2;
string relay_server = 3;
string id = 4;
string version = 5;
}
message PeerDiscovery {
string cmd = 1;
string mac = 2;
string id = 3;
string username = 4;
string hostname = 5;
string platform = 6;
string misc = 7;
}
message OnlineRequest {
string id = 1;
repeated string peers = 2;
}
message OnlineResponse {
bytes states = 1;
}
message RendezvousMessage {
oneof union {
RegisterPeer register_peer = 6;
RegisterPeerResponse register_peer_response = 7;
PunchHoleRequest punch_hole_request = 8;
PunchHole punch_hole = 9;
PunchHoleSent punch_hole_sent = 10;
PunchHoleResponse punch_hole_response = 11;
FetchLocalAddr fetch_local_addr = 12;
LocalAddr local_addr = 13;
ConfigUpdate configure_update = 14;
RegisterPk register_pk = 15;
RegisterPkResponse register_pk_response = 16;
SoftwareUpdate software_update = 17;
RequestRelay request_relay = 18;
RelayResponse relay_response = 19;
TestNatRequest test_nat_request = 20;
TestNatResponse test_nat_response = 21;
PeerDiscovery peer_discovery = 22;
OnlineRequest online_request = 23;
OnlineResponse online_response = 24;
}
}

View File

@ -1,280 +0,0 @@
use bytes::{Buf, BufMut, Bytes, BytesMut};
use std::io;
use tokio_util::codec::{Decoder, Encoder};
#[derive(Debug, Clone, Copy)]
pub struct BytesCodec {
state: DecodeState,
raw: bool,
max_packet_length: usize,
}
#[derive(Debug, Clone, Copy)]
enum DecodeState {
Head,
Data(usize),
}
impl Default for BytesCodec {
fn default() -> Self {
Self::new()
}
}
impl BytesCodec {
pub fn new() -> Self {
Self {
state: DecodeState::Head,
raw: false,
max_packet_length: usize::MAX,
}
}
pub fn set_raw(&mut self) {
self.raw = true;
}
pub fn set_max_packet_length(&mut self, n: usize) {
self.max_packet_length = n;
}
fn decode_head(&mut self, src: &mut BytesMut) -> io::Result<Option<usize>> {
if src.is_empty() {
return Ok(None);
}
let head_len = ((src[0] & 0x3) + 1) as usize;
if src.len() < head_len {
return Ok(None);
}
let mut n = src[0] as usize;
if head_len > 1 {
n |= (src[1] as usize) << 8;
}
if head_len > 2 {
n |= (src[2] as usize) << 16;
}
if head_len > 3 {
n |= (src[3] as usize) << 24;
}
n >>= 2;
if n > self.max_packet_length {
return Err(io::Error::new(io::ErrorKind::InvalidData, "Too big packet"));
}
src.advance(head_len);
src.reserve(n);
Ok(Some(n))
}
fn decode_data(&self, n: usize, src: &mut BytesMut) -> io::Result<Option<BytesMut>> {
if src.len() < n {
return Ok(None);
}
Ok(Some(src.split_to(n)))
}
}
impl Decoder for BytesCodec {
type Item = BytesMut;
type Error = io::Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<BytesMut>, io::Error> {
if self.raw {
if !src.is_empty() {
let len = src.len();
return Ok(Some(src.split_to(len)));
} else {
return Ok(None);
}
}
let n = match self.state {
DecodeState::Head => match self.decode_head(src)? {
Some(n) => {
self.state = DecodeState::Data(n);
n
}
None => return Ok(None),
},
DecodeState::Data(n) => n,
};
match self.decode_data(n, src)? {
Some(data) => {
self.state = DecodeState::Head;
Ok(Some(data))
}
None => Ok(None),
}
}
}
impl Encoder<Bytes> for BytesCodec {
type Error = io::Error;
fn encode(&mut self, data: Bytes, buf: &mut BytesMut) -> Result<(), io::Error> {
if self.raw {
buf.reserve(data.len());
buf.put(data);
return Ok(());
}
if data.len() <= 0x3F {
buf.put_u8((data.len() << 2) as u8);
} else if data.len() <= 0x3FFF {
buf.put_u16_le((data.len() << 2) as u16 | 0x1);
} else if data.len() <= 0x3FFFFF {
let h = (data.len() << 2) as u32 | 0x2;
buf.put_u16_le((h & 0xFFFF) as u16);
buf.put_u8((h >> 16) as u8);
} else if data.len() <= 0x3FFFFFFF {
buf.put_u32_le((data.len() << 2) as u32 | 0x3);
} else {
return Err(io::Error::new(io::ErrorKind::InvalidInput, "Overflow"));
}
buf.extend(data);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_codec1() {
let mut codec = BytesCodec::new();
let mut buf = BytesMut::new();
let mut bytes: Vec<u8> = Vec::new();
bytes.resize(0x3F, 1);
assert!(codec.encode(bytes.into(), &mut buf).is_ok());
let buf_saved = buf.clone();
assert_eq!(buf.len(), 0x3F + 1);
if let Ok(Some(res)) = codec.decode(&mut buf) {
assert_eq!(res.len(), 0x3F);
assert_eq!(res[0], 1);
} else {
panic!();
}
let mut codec2 = BytesCodec::new();
let mut buf2 = BytesMut::new();
if let Ok(None) = codec2.decode(&mut buf2) {
} else {
panic!();
}
buf2.extend(&buf_saved[0..1]);
if let Ok(None) = codec2.decode(&mut buf2) {
} else {
panic!();
}
buf2.extend(&buf_saved[1..]);
if let Ok(Some(res)) = codec2.decode(&mut buf2) {
assert_eq!(res.len(), 0x3F);
assert_eq!(res[0], 1);
} else {
panic!();
}
}
#[test]
fn test_codec2() {
let mut codec = BytesCodec::new();
let mut buf = BytesMut::new();
let mut bytes: Vec<u8> = Vec::new();
assert!(codec.encode("".into(), &mut buf).is_ok());
assert_eq!(buf.len(), 1);
bytes.resize(0x3F + 1, 2);
assert!(codec.encode(bytes.into(), &mut buf).is_ok());
assert_eq!(buf.len(), 0x3F + 2 + 2);
if let Ok(Some(res)) = codec.decode(&mut buf) {
assert_eq!(res.len(), 0);
} else {
panic!();
}
if let Ok(Some(res)) = codec.decode(&mut buf) {
assert_eq!(res.len(), 0x3F + 1);
assert_eq!(res[0], 2);
} else {
panic!();
}
}
#[test]
fn test_codec3() {
let mut codec = BytesCodec::new();
let mut buf = BytesMut::new();
let mut bytes: Vec<u8> = Vec::new();
bytes.resize(0x3F - 1, 3);
assert!(codec.encode(bytes.into(), &mut buf).is_ok());
assert_eq!(buf.len(), 0x3F + 1 - 1);
if let Ok(Some(res)) = codec.decode(&mut buf) {
assert_eq!(res.len(), 0x3F - 1);
assert_eq!(res[0], 3);
} else {
panic!();
}
}
#[test]
fn test_codec4() {
let mut codec = BytesCodec::new();
let mut buf = BytesMut::new();
let mut bytes: Vec<u8> = Vec::new();
bytes.resize(0x3FFF, 4);
assert!(codec.encode(bytes.into(), &mut buf).is_ok());
assert_eq!(buf.len(), 0x3FFF + 2);
if let Ok(Some(res)) = codec.decode(&mut buf) {
assert_eq!(res.len(), 0x3FFF);
assert_eq!(res[0], 4);
} else {
panic!();
}
}
#[test]
fn test_codec5() {
let mut codec = BytesCodec::new();
let mut buf = BytesMut::new();
let mut bytes: Vec<u8> = Vec::new();
bytes.resize(0x3FFFFF, 5);
assert!(codec.encode(bytes.into(), &mut buf).is_ok());
assert_eq!(buf.len(), 0x3FFFFF + 3);
if let Ok(Some(res)) = codec.decode(&mut buf) {
assert_eq!(res.len(), 0x3FFFFF);
assert_eq!(res[0], 5);
} else {
panic!();
}
}
#[test]
fn test_codec6() {
let mut codec = BytesCodec::new();
let mut buf = BytesMut::new();
let mut bytes: Vec<u8> = Vec::new();
bytes.resize(0x3FFFFF + 1, 6);
assert!(codec.encode(bytes.into(), &mut buf).is_ok());
let buf_saved = buf.clone();
assert_eq!(buf.len(), 0x3FFFFF + 4 + 1);
if let Ok(Some(res)) = codec.decode(&mut buf) {
assert_eq!(res.len(), 0x3FFFFF + 1);
assert_eq!(res[0], 6);
} else {
panic!();
}
let mut codec2 = BytesCodec::new();
let mut buf2 = BytesMut::new();
buf2.extend(&buf_saved[0..1]);
if let Ok(None) = codec2.decode(&mut buf2) {
} else {
panic!();
}
buf2.extend(&buf_saved[1..6]);
if let Ok(None) = codec2.decode(&mut buf2) {
} else {
panic!();
}
buf2.extend(&buf_saved[6..]);
if let Ok(Some(res)) = codec2.decode(&mut buf2) {
assert_eq!(res.len(), 0x3FFFFF + 1);
assert_eq!(res[0], 6);
} else {
panic!();
}
}
}

View File

@ -1,45 +0,0 @@
use std::cell::RefCell;
use zstd::block::{Compressor, Decompressor};
thread_local! {
static COMPRESSOR: RefCell<Compressor> = RefCell::new(Compressor::new());
static DECOMPRESSOR: RefCell<Decompressor> = RefCell::new(Decompressor::new());
}
/// The library supports regular compression levels from 1 up to ZSTD_maxCLevel(),
/// which is currently 22. Levels >= 20
/// Default level is ZSTD_CLEVEL_DEFAULT==3.
/// value 0 means default, which is controlled by ZSTD_CLEVEL_DEFAULT
pub fn compress(data: &[u8], level: i32) -> Vec<u8> {
let mut out = Vec::new();
COMPRESSOR.with(|c| {
if let Ok(mut c) = c.try_borrow_mut() {
match c.compress(data, level) {
Ok(res) => out = res,
Err(err) => {
crate::log::debug!("Failed to compress: {}", err);
}
}
}
});
out
}
pub fn decompress(data: &[u8]) -> Vec<u8> {
let mut out = Vec::new();
DECOMPRESSOR.with(|d| {
if let Ok(mut d) = d.try_borrow_mut() {
const MAX: usize = 1024 * 1024 * 64;
const MIN: usize = 1024 * 1024;
let mut n = 30 * data.len();
n = n.clamp(MIN, MAX);
match d.decompress(data, n) {
Ok(res) => out = res,
Err(err) => {
crate::log::debug!("Failed to decompress: {}", err);
}
}
}
});
out
}

File diff suppressed because it is too large Load Diff

View File

@ -1,839 +0,0 @@
#[cfg(windows)]
use std::os::windows::prelude::*;
use std::path::{Path, PathBuf};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use serde_derive::{Deserialize, Serialize};
use tokio::{fs::File, io::*};
use crate::{bail, get_version_number, message_proto::*, ResultType, Stream};
// https://doc.rust-lang.org/std/os/windows/fs/trait.MetadataExt.html
use crate::{
compress::{compress, decompress},
config::{Config, COMPRESS_LEVEL},
};
pub fn read_dir(path: &Path, include_hidden: bool) -> ResultType<FileDirectory> {
let mut dir = FileDirectory {
path: get_string(path),
..Default::default()
};
#[cfg(windows)]
if "/" == &get_string(path) {
let drives = unsafe { winapi::um::fileapi::GetLogicalDrives() };
for i in 0..32 {
if drives & (1 << i) != 0 {
let name = format!(
"{}:",
std::char::from_u32('A' as u32 + i as u32).unwrap_or('A')
);
dir.entries.push(FileEntry {
name,
entry_type: FileType::DirDrive.into(),
..Default::default()
});
}
}
return Ok(dir);
}
for entry in path.read_dir()?.flatten() {
let p = entry.path();
let name = p
.file_name()
.map(|p| p.to_str().unwrap_or(""))
.unwrap_or("")
.to_owned();
if name.is_empty() {
continue;
}
let mut is_hidden = false;
let meta;
if let Ok(tmp) = std::fs::symlink_metadata(&p) {
meta = tmp;
} else {
continue;
}
// docs.microsoft.com/en-us/windows/win32/fileio/file-attribute-constants
#[cfg(windows)]
if meta.file_attributes() & 0x2 != 0 {
is_hidden = true;
}
#[cfg(not(windows))]
if name.find('.').unwrap_or(usize::MAX) == 0 {
is_hidden = true;
}
if is_hidden && !include_hidden {
continue;
}
let (entry_type, size) = {
if p.is_dir() {
if meta.file_type().is_symlink() {
(FileType::DirLink.into(), 0)
} else {
(FileType::Dir.into(), 0)
}
} else if meta.file_type().is_symlink() {
(FileType::FileLink.into(), 0)
} else {
(FileType::File.into(), meta.len())
}
};
let modified_time = meta
.modified()
.map(|x| {
x.duration_since(std::time::SystemTime::UNIX_EPOCH)
.map(|x| x.as_secs())
.unwrap_or(0)
})
.unwrap_or(0);
dir.entries.push(FileEntry {
name: get_file_name(&p),
entry_type,
is_hidden,
size,
modified_time,
..Default::default()
});
}
Ok(dir)
}
#[inline]
pub fn get_file_name(p: &Path) -> String {
p.file_name()
.map(|p| p.to_str().unwrap_or(""))
.unwrap_or("")
.to_owned()
}
#[inline]
pub fn get_string(path: &Path) -> String {
path.to_str().unwrap_or("").to_owned()
}
#[inline]
pub fn get_path(path: &str) -> PathBuf {
Path::new(path).to_path_buf()
}
#[inline]
pub fn get_home_as_string() -> String {
get_string(&Config::get_home())
}
fn read_dir_recursive(
path: &PathBuf,
prefix: &Path,
include_hidden: bool,
) -> ResultType<Vec<FileEntry>> {
let mut files = Vec::new();
if path.is_dir() {
// to-do: symbol link handling, cp the link rather than the content
// to-do: file mode, for unix
let fd = read_dir(path, include_hidden)?;
for entry in fd.entries.iter() {
match entry.entry_type.enum_value() {
Ok(FileType::File) => {
let mut entry = entry.clone();
entry.name = get_string(&prefix.join(entry.name));
files.push(entry);
}
Ok(FileType::Dir) => {
if let Ok(mut tmp) = read_dir_recursive(
&path.join(&entry.name),
&prefix.join(&entry.name),
include_hidden,
) {
for entry in tmp.drain(0..) {
files.push(entry);
}
}
}
_ => {}
}
}
Ok(files)
} else if path.is_file() {
let (size, modified_time) = if let Ok(meta) = std::fs::metadata(path) {
(
meta.len(),
meta.modified()
.map(|x| {
x.duration_since(std::time::SystemTime::UNIX_EPOCH)
.map(|x| x.as_secs())
.unwrap_or(0)
})
.unwrap_or(0),
)
} else {
(0, 0)
};
files.push(FileEntry {
entry_type: FileType::File.into(),
size,
modified_time,
..Default::default()
});
Ok(files)
} else {
bail!("Not exists");
}
}
pub fn get_recursive_files(path: &str, include_hidden: bool) -> ResultType<Vec<FileEntry>> {
read_dir_recursive(&get_path(path), &get_path(""), include_hidden)
}
#[inline]
pub fn is_file_exists(file_path: &str) -> bool {
return Path::new(file_path).exists();
}
#[inline]
pub fn can_enable_overwrite_detection(version: i64) -> bool {
version >= get_version_number("1.1.10")
}
#[derive(Default)]
pub struct TransferJob {
pub id: i32,
pub remote: String,
pub path: PathBuf,
pub show_hidden: bool,
pub is_remote: bool,
pub is_last_job: bool,
pub file_num: i32,
pub files: Vec<FileEntry>,
file: Option<File>,
total_size: u64,
finished_size: u64,
transferred: u64,
enable_overwrite_detection: bool,
file_confirmed: bool,
// indicating the last file is skipped
file_skipped: bool,
file_is_waiting: bool,
default_overwrite_strategy: Option<bool>,
}
#[derive(Debug, Default, Serialize, Deserialize, Clone)]
pub struct TransferJobMeta {
#[serde(default)]
pub id: i32,
#[serde(default)]
pub remote: String,
#[serde(default)]
pub to: String,
#[serde(default)]
pub show_hidden: bool,
#[serde(default)]
pub file_num: i32,
#[serde(default)]
pub is_remote: bool,
}
#[derive(Debug, Default, Serialize, Deserialize, Clone)]
pub struct RemoveJobMeta {
#[serde(default)]
pub path: String,
#[serde(default)]
pub is_remote: bool,
#[serde(default)]
pub no_confirm: bool,
}
#[inline]
fn get_ext(name: &str) -> &str {
if let Some(i) = name.rfind('.') {
return &name[i + 1..];
}
""
}
#[inline]
fn is_compressed_file(name: &str) -> bool {
let ext = get_ext(name);
ext == "xz"
|| ext == "gz"
|| ext == "zip"
|| ext == "7z"
|| ext == "rar"
|| ext == "bz2"
|| ext == "tgz"
|| ext == "png"
|| ext == "jpg"
}
impl TransferJob {
#[allow(clippy::too_many_arguments)]
pub fn new_write(
id: i32,
remote: String,
path: String,
file_num: i32,
show_hidden: bool,
is_remote: bool,
files: Vec<FileEntry>,
enable_overwrite_detection: bool,
) -> Self {
log::info!("new write {}", path);
let total_size = files.iter().map(|x| x.size).sum();
Self {
id,
remote,
path: get_path(&path),
file_num,
show_hidden,
is_remote,
files,
total_size,
enable_overwrite_detection,
..Default::default()
}
}
pub fn new_read(
id: i32,
remote: String,
path: String,
file_num: i32,
show_hidden: bool,
is_remote: bool,
enable_overwrite_detection: bool,
) -> ResultType<Self> {
log::info!("new read {}", path);
let files = get_recursive_files(&path, show_hidden)?;
let total_size = files.iter().map(|x| x.size).sum();
Ok(Self {
id,
remote,
path: get_path(&path),
file_num,
show_hidden,
is_remote,
files,
total_size,
enable_overwrite_detection,
..Default::default()
})
}
#[inline]
pub fn files(&self) -> &Vec<FileEntry> {
&self.files
}
#[inline]
pub fn set_files(&mut self, files: Vec<FileEntry>) {
self.files = files;
}
#[inline]
pub fn id(&self) -> i32 {
self.id
}
#[inline]
pub fn total_size(&self) -> u64 {
self.total_size
}
#[inline]
pub fn finished_size(&self) -> u64 {
self.finished_size
}
#[inline]
pub fn transferred(&self) -> u64 {
self.transferred
}
#[inline]
pub fn file_num(&self) -> i32 {
self.file_num
}
pub fn modify_time(&self) {
let file_num = self.file_num as usize;
if file_num < self.files.len() {
let entry = &self.files[file_num];
let path = self.join(&entry.name);
let download_path = format!("{}.download", get_string(&path));
std::fs::rename(download_path, &path).ok();
filetime::set_file_mtime(
&path,
filetime::FileTime::from_unix_time(entry.modified_time as _, 0),
)
.ok();
}
}
pub fn remove_download_file(&self) {
let file_num = self.file_num as usize;
if file_num < self.files.len() {
let entry = &self.files[file_num];
let path = self.join(&entry.name);
let download_path = format!("{}.download", get_string(&path));
std::fs::remove_file(download_path).ok();
}
}
pub async fn write(&mut self, block: FileTransferBlock) -> ResultType<()> {
if block.id != self.id {
bail!("Wrong id");
}
let file_num = block.file_num as usize;
if file_num >= self.files.len() {
bail!("Wrong file number");
}
if file_num != self.file_num as usize || self.file.is_none() {
self.modify_time();
if let Some(file) = self.file.as_mut() {
file.sync_all().await?;
}
self.file_num = block.file_num;
let entry = &self.files[file_num];
let path = self.join(&entry.name);
if let Some(p) = path.parent() {
std::fs::create_dir_all(p).ok();
}
let path = format!("{}.download", get_string(&path));
self.file = Some(File::create(&path).await?);
}
if block.compressed {
let tmp = decompress(&block.data);
self.file.as_mut().unwrap().write_all(&tmp).await?;
self.finished_size += tmp.len() as u64;
} else {
self.file.as_mut().unwrap().write_all(&block.data).await?;
self.finished_size += block.data.len() as u64;
}
self.transferred += block.data.len() as u64;
Ok(())
}
#[inline]
pub fn join(&self, name: &str) -> PathBuf {
if name.is_empty() {
self.path.clone()
} else {
self.path.join(name)
}
}
pub async fn read(&mut self, stream: &mut Stream) -> ResultType<Option<FileTransferBlock>> {
let file_num = self.file_num as usize;
if file_num >= self.files.len() {
self.file.take();
return Ok(None);
}
let name = &self.files[file_num].name;
if self.file.is_none() {
match File::open(self.join(name)).await {
Ok(file) => {
self.file = Some(file);
self.file_confirmed = false;
self.file_is_waiting = false;
}
Err(err) => {
self.file_num += 1;
self.file_confirmed = false;
self.file_is_waiting = false;
return Err(err.into());
}
}
}
if self.enable_overwrite_detection && !self.file_confirmed() {
if !self.file_is_waiting() {
self.send_current_digest(stream).await?;
self.set_file_is_waiting(true);
}
return Ok(None);
}
const BUF_SIZE: usize = 128 * 1024;
let mut buf: Vec<u8> = vec![0; BUF_SIZE];
let mut compressed = false;
let mut offset: usize = 0;
loop {
match self.file.as_mut().unwrap().read(&mut buf[offset..]).await {
Err(err) => {
self.file_num += 1;
self.file = None;
self.file_confirmed = false;
self.file_is_waiting = false;
return Err(err.into());
}
Ok(n) => {
offset += n;
if n == 0 || offset == BUF_SIZE {
break;
}
}
}
}
unsafe { buf.set_len(offset) };
if offset == 0 {
self.file_num += 1;
self.file = None;
self.file_confirmed = false;
self.file_is_waiting = false;
} else {
self.finished_size += offset as u64;
if !is_compressed_file(name) {
let tmp = compress(&buf, COMPRESS_LEVEL);
if tmp.len() < buf.len() {
buf = tmp;
compressed = true;
}
}
self.transferred += buf.len() as u64;
}
Ok(Some(FileTransferBlock {
id: self.id,
file_num: file_num as _,
data: buf.into(),
compressed,
..Default::default()
}))
}
async fn send_current_digest(&mut self, stream: &mut Stream) -> ResultType<()> {
let mut msg = Message::new();
let mut resp = FileResponse::new();
let meta = self.file.as_ref().unwrap().metadata().await?;
let last_modified = meta
.modified()?
.duration_since(SystemTime::UNIX_EPOCH)?
.as_secs();
resp.set_digest(FileTransferDigest {
id: self.id,
file_num: self.file_num,
last_modified,
file_size: meta.len(),
..Default::default()
});
msg.set_file_response(resp);
stream.send(&msg).await?;
log::info!(
"id: {}, file_num:{}, digest message is sent. waiting for confirm. msg: {:?}",
self.id,
self.file_num,
msg
);
Ok(())
}
pub fn set_overwrite_strategy(&mut self, overwrite_strategy: Option<bool>) {
self.default_overwrite_strategy = overwrite_strategy;
}
pub fn default_overwrite_strategy(&self) -> Option<bool> {
self.default_overwrite_strategy
}
pub fn set_file_confirmed(&mut self, file_confirmed: bool) {
log::info!("id: {}, file_confirmed: {}", self.id, file_confirmed);
self.file_confirmed = file_confirmed;
self.file_skipped = false;
}
pub fn set_file_is_waiting(&mut self, file_is_waiting: bool) {
self.file_is_waiting = file_is_waiting;
}
#[inline]
pub fn file_is_waiting(&self) -> bool {
self.file_is_waiting
}
#[inline]
pub fn file_confirmed(&self) -> bool {
self.file_confirmed
}
/// Indicating whether the last file is skipped
#[inline]
pub fn file_skipped(&self) -> bool {
self.file_skipped
}
/// Indicating whether the whole task is skipped
#[inline]
pub fn job_skipped(&self) -> bool {
self.file_skipped() && self.files.len() == 1
}
/// Check whether the job is completed after `read` returns `None`
/// This is a helper function which gives additional lifecycle when the job reads `None`.
/// If returns `true`, it means we can delete the job automatically. `False` otherwise.
///
/// [`Note`]
/// Conditions:
/// 1. Files are not waiting for confirmation by peers.
#[inline]
pub fn job_completed(&self) -> bool {
// has no error, Condition 2
!self.enable_overwrite_detection || (!self.file_confirmed && !self.file_is_waiting)
}
/// Get job error message, useful for getting status when job had finished
pub fn job_error(&self) -> Option<String> {
if self.job_skipped() {
return Some("skipped".to_string());
}
None
}
pub fn set_file_skipped(&mut self) -> bool {
log::debug!("skip file {} in job {}", self.file_num, self.id);
self.file.take();
self.set_file_confirmed(false);
self.set_file_is_waiting(false);
self.file_num += 1;
self.file_skipped = true;
true
}
pub fn confirm(&mut self, r: &FileTransferSendConfirmRequest) -> bool {
if self.file_num() != r.file_num {
log::info!("file num truncated, ignoring");
} else {
match r.union {
Some(file_transfer_send_confirm_request::Union::Skip(s)) => {
if s {
self.set_file_skipped();
} else {
self.set_file_confirmed(true);
}
}
Some(file_transfer_send_confirm_request::Union::OffsetBlk(_offset)) => {
self.set_file_confirmed(true);
}
_ => {}
}
}
true
}
#[inline]
pub fn gen_meta(&self) -> TransferJobMeta {
TransferJobMeta {
id: self.id,
remote: self.remote.to_string(),
to: self.path.to_string_lossy().to_string(),
file_num: self.file_num,
show_hidden: self.show_hidden,
is_remote: self.is_remote,
}
}
}
#[inline]
pub fn new_error<T: std::string::ToString>(id: i32, err: T, file_num: i32) -> Message {
let mut resp = FileResponse::new();
resp.set_error(FileTransferError {
id,
error: err.to_string(),
file_num,
..Default::default()
});
let mut msg_out = Message::new();
msg_out.set_file_response(resp);
msg_out
}
#[inline]
pub fn new_dir(id: i32, path: String, files: Vec<FileEntry>) -> Message {
let mut resp = FileResponse::new();
resp.set_dir(FileDirectory {
id,
path,
entries: files,
..Default::default()
});
let mut msg_out = Message::new();
msg_out.set_file_response(resp);
msg_out
}
#[inline]
pub fn new_block(block: FileTransferBlock) -> Message {
let mut resp = FileResponse::new();
resp.set_block(block);
let mut msg_out = Message::new();
msg_out.set_file_response(resp);
msg_out
}
#[inline]
pub fn new_send_confirm(r: FileTransferSendConfirmRequest) -> Message {
let mut msg_out = Message::new();
let mut action = FileAction::new();
action.set_send_confirm(r);
msg_out.set_file_action(action);
msg_out
}
#[inline]
pub fn new_receive(id: i32, path: String, file_num: i32, files: Vec<FileEntry>) -> Message {
let mut action = FileAction::new();
action.set_receive(FileTransferReceiveRequest {
id,
path,
files,
file_num,
..Default::default()
});
let mut msg_out = Message::new();
msg_out.set_file_action(action);
msg_out
}
#[inline]
pub fn new_send(id: i32, path: String, file_num: i32, include_hidden: bool) -> Message {
log::info!("new send: {},id : {}", path, id);
let mut action = FileAction::new();
action.set_send(FileTransferSendRequest {
id,
path,
include_hidden,
file_num,
..Default::default()
});
let mut msg_out = Message::new();
msg_out.set_file_action(action);
msg_out
}
#[inline]
pub fn new_done(id: i32, file_num: i32) -> Message {
let mut resp = FileResponse::new();
resp.set_done(FileTransferDone {
id,
file_num,
..Default::default()
});
let mut msg_out = Message::new();
msg_out.set_file_response(resp);
msg_out
}
#[inline]
pub fn remove_job(id: i32, jobs: &mut Vec<TransferJob>) {
*jobs = jobs.drain(0..).filter(|x| x.id() != id).collect();
}
#[inline]
pub fn get_job(id: i32, jobs: &mut [TransferJob]) -> Option<&mut TransferJob> {
jobs.iter_mut().find(|x| x.id() == id)
}
pub async fn handle_read_jobs(
jobs: &mut Vec<TransferJob>,
stream: &mut crate::Stream,
) -> ResultType<()> {
let mut finished = Vec::new();
for job in jobs.iter_mut() {
if job.is_last_job {
continue;
}
match job.read(stream).await {
Err(err) => {
stream
.send(&new_error(job.id(), err, job.file_num()))
.await?;
}
Ok(Some(block)) => {
stream.send(&new_block(block)).await?;
}
Ok(None) => {
if job.job_completed() {
finished.push(job.id());
let err = job.job_error();
if err.is_some() {
stream
.send(&new_error(job.id(), err.unwrap(), job.file_num()))
.await?;
} else {
stream.send(&new_done(job.id(), job.file_num())).await?;
}
} else {
// waiting confirmation.
}
}
}
}
for id in finished {
remove_job(id, jobs);
}
Ok(())
}
pub fn remove_all_empty_dir(path: &PathBuf) -> ResultType<()> {
let fd = read_dir(path, true)?;
for entry in fd.entries.iter() {
match entry.entry_type.enum_value() {
Ok(FileType::Dir) => {
remove_all_empty_dir(&path.join(&entry.name)).ok();
}
Ok(FileType::DirLink) | Ok(FileType::FileLink) => {
std::fs::remove_file(path.join(&entry.name)).ok();
}
_ => {}
}
}
std::fs::remove_dir(path).ok();
Ok(())
}
#[inline]
pub fn remove_file(file: &str) -> ResultType<()> {
std::fs::remove_file(get_path(file))?;
Ok(())
}
#[inline]
pub fn create_dir(dir: &str) -> ResultType<()> {
std::fs::create_dir_all(get_path(dir))?;
Ok(())
}
#[inline]
pub fn transform_windows_path(entries: &mut Vec<FileEntry>) {
for entry in entries {
entry.name = entry.name.replace('\\', "/");
}
}
pub enum DigestCheckResult {
IsSame,
NeedConfirm(FileTransferDigest),
NoSuchFile,
}
#[inline]
pub fn is_write_need_confirmation(
file_path: &str,
digest: &FileTransferDigest,
) -> ResultType<DigestCheckResult> {
let path = Path::new(file_path);
if path.exists() && path.is_file() {
let metadata = std::fs::metadata(path)?;
let modified_time = metadata.modified()?;
let remote_mt = Duration::from_secs(digest.last_modified);
let local_mt = modified_time.duration_since(UNIX_EPOCH)?;
if remote_mt == local_mt && digest.file_size == metadata.len() {
return Ok(DigestCheckResult::IsSame);
}
Ok(DigestCheckResult::NeedConfirm(FileTransferDigest {
id: digest.id,
file_num: digest.file_num,
last_modified: local_mt.as_secs(),
file_size: metadata.len(),
..Default::default()
}))
} else {
Ok(DigestCheckResult::NoSuchFile)
}
}

View File

@ -1,39 +0,0 @@
use std::{fmt, slice::Iter, str::FromStr};
use crate::protos::message::KeyboardMode;
impl fmt::Display for KeyboardMode {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
KeyboardMode::Legacy => write!(f, "legacy"),
KeyboardMode::Map => write!(f, "map"),
KeyboardMode::Translate => write!(f, "translate"),
KeyboardMode::Auto => write!(f, "auto"),
}
}
}
impl FromStr for KeyboardMode {
type Err = ();
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"legacy" => Ok(KeyboardMode::Legacy),
"map" => Ok(KeyboardMode::Map),
"translate" => Ok(KeyboardMode::Translate),
"auto" => Ok(KeyboardMode::Auto),
_ => Err(()),
}
}
}
impl KeyboardMode {
pub fn iter() -> Iter<'static, KeyboardMode> {
const KEYBOARD_MODES: [KeyboardMode; 4] = [
KeyboardMode::Legacy,
KeyboardMode::Map,
KeyboardMode::Translate,
KeyboardMode::Auto,
];
KEYBOARD_MODES.iter()
}
}

View File

@ -1,390 +0,0 @@
pub mod compress;
pub mod protos;
pub use bytes;
use config::Config;
pub use futures;
pub use protobuf;
pub use protos::message as message_proto;
pub use protos::rendezvous as rendezvous_proto;
use std::{
fs::File,
io::{self, BufRead},
net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4},
path::Path,
time::{self, SystemTime, UNIX_EPOCH},
};
pub use tokio;
pub use tokio_util;
pub mod socket_client;
pub mod tcp;
pub mod udp;
pub use env_logger;
pub use log;
pub mod bytes_codec;
#[cfg(feature = "quic")]
pub mod quic;
pub use anyhow::{self, bail};
pub use futures_util;
pub mod config;
pub mod fs;
pub use lazy_static;
#[cfg(not(any(target_os = "android", target_os = "ios")))]
pub use mac_address;
pub use rand;
pub use regex;
pub use sodiumoxide;
pub use tokio_socks;
pub use tokio_socks::IntoTargetAddr;
pub use tokio_socks::TargetAddr;
pub mod password_security;
pub use chrono;
pub use directories_next;
pub mod keyboard;
#[cfg(feature = "quic")]
pub type Stream = quic::Connection;
#[cfg(not(feature = "quic"))]
pub type Stream = tcp::FramedStream;
#[inline]
pub async fn sleep(sec: f32) {
tokio::time::sleep(time::Duration::from_secs_f32(sec)).await;
}
#[macro_export]
macro_rules! allow_err {
($e:expr) => {
if let Err(err) = $e {
log::debug!(
"{:?}, {}:{}:{}:{}",
err,
module_path!(),
file!(),
line!(),
column!()
);
} else {
}
};
($e:expr, $($arg:tt)*) => {
if let Err(err) = $e {
log::debug!(
"{:?}, {}, {}:{}:{}:{}",
err,
format_args!($($arg)*),
module_path!(),
file!(),
line!(),
column!()
);
} else {
}
};
}
#[inline]
pub fn timeout<T: std::future::Future>(ms: u64, future: T) -> tokio::time::Timeout<T> {
tokio::time::timeout(std::time::Duration::from_millis(ms), future)
}
pub type ResultType<F, E = anyhow::Error> = anyhow::Result<F, E>;
/// Certain router and firewalls scan the packet and if they
/// find an IP address belonging to their pool that they use to do the NAT mapping/translation, so here we mangle the ip address
pub struct AddrMangle();
#[inline]
pub fn try_into_v4(addr: SocketAddr) -> SocketAddr {
match addr {
SocketAddr::V6(v6) if !addr.ip().is_loopback() => {
if let Some(v4) = v6.ip().to_ipv4() {
SocketAddr::new(IpAddr::V4(v4), addr.port())
} else {
addr
}
}
_ => addr,
}
}
impl AddrMangle {
pub fn encode(addr: SocketAddr) -> Vec<u8> {
// not work with [:1]:<port>
let addr = try_into_v4(addr);
match addr {
SocketAddr::V4(addr_v4) => {
let tm = (SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_micros() as u32) as u128;
let ip = u32::from_le_bytes(addr_v4.ip().octets()) as u128;
let port = addr.port() as u128;
let v = ((ip + tm) << 49) | (tm << 17) | (port + (tm & 0xFFFF));
let bytes = v.to_le_bytes();
let mut n_padding = 0;
for i in bytes.iter().rev() {
if i == &0u8 {
n_padding += 1;
} else {
break;
}
}
bytes[..(16 - n_padding)].to_vec()
}
SocketAddr::V6(addr_v6) => {
let mut x = addr_v6.ip().octets().to_vec();
let port: [u8; 2] = addr_v6.port().to_le_bytes();
x.push(port[0]);
x.push(port[1]);
x
}
}
}
pub fn decode(bytes: &[u8]) -> SocketAddr {
use std::convert::TryInto;
if bytes.len() > 16 {
if bytes.len() != 18 {
return Config::get_any_listen_addr(false);
}
let tmp: [u8; 2] = bytes[16..].try_into().unwrap();
let port = u16::from_le_bytes(tmp);
let tmp: [u8; 16] = bytes[..16].try_into().unwrap();
let ip = std::net::Ipv6Addr::from(tmp);
return SocketAddr::new(IpAddr::V6(ip), port);
}
let mut padded = [0u8; 16];
padded[..bytes.len()].copy_from_slice(bytes);
let number = u128::from_le_bytes(padded);
let tm = (number >> 17) & (u32::max_value() as u128);
let ip = (((number >> 49) - tm) as u32).to_le_bytes();
let port = (number & 0xFFFFFF) - (tm & 0xFFFF);
SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(ip[0], ip[1], ip[2], ip[3]),
port as u16,
))
}
}
pub fn get_version_from_url(url: &str) -> String {
let n = url.chars().count();
let a = url.chars().rev().position(|x| x == '-');
if let Some(a) = a {
let b = url.chars().rev().position(|x| x == '.');
if let Some(b) = b {
if a > b {
if url
.chars()
.skip(n - b)
.collect::<String>()
.parse::<i32>()
.is_ok()
{
return url.chars().skip(n - a).collect();
} else {
return url.chars().skip(n - a).take(a - b - 1).collect();
}
} else {
return url.chars().skip(n - a).collect();
}
}
}
"".to_owned()
}
pub fn gen_version() {
println!("cargo:rerun-if-changed=Cargo.toml");
use std::io::prelude::*;
let mut file = File::create("./src/version.rs").unwrap();
for line in read_lines("Cargo.toml").unwrap().flatten() {
let ab: Vec<&str> = line.split('=').map(|x| x.trim()).collect();
if ab.len() == 2 && ab[0] == "version" {
file.write_all(format!("pub const VERSION: &str = {};\n", ab[1]).as_bytes())
.ok();
break;
}
}
// generate build date
let build_date = format!("{}", chrono::Local::now().format("%Y-%m-%d %H:%M"));
file.write_all(
format!("#[allow(dead_code)]\npub const BUILD_DATE: &str = \"{build_date}\";\n").as_bytes(),
)
.ok();
file.sync_all().ok();
}
fn read_lines<P>(filename: P) -> io::Result<io::Lines<io::BufReader<File>>>
where
P: AsRef<Path>,
{
let file = File::open(filename)?;
Ok(io::BufReader::new(file).lines())
}
pub fn is_valid_custom_id(id: &str) -> bool {
regex::Regex::new(r"^[a-zA-Z]\w{5,15}$")
.unwrap()
.is_match(id)
}
pub fn get_version_number(v: &str) -> i64 {
let mut n = 0;
for x in v.split('.') {
n = n * 1000 + x.parse::<i64>().unwrap_or(0);
}
n
}
pub fn get_modified_time(path: &std::path::Path) -> SystemTime {
std::fs::metadata(path)
.map(|m| m.modified().unwrap_or(UNIX_EPOCH))
.unwrap_or(UNIX_EPOCH)
}
pub fn get_created_time(path: &std::path::Path) -> SystemTime {
std::fs::metadata(path)
.map(|m| m.created().unwrap_or(UNIX_EPOCH))
.unwrap_or(UNIX_EPOCH)
}
pub fn get_exe_time() -> SystemTime {
std::env::current_exe().map_or(UNIX_EPOCH, |path| {
let m = get_modified_time(&path);
let c = get_created_time(&path);
if m > c {
m
} else {
c
}
})
}
pub fn get_uuid() -> Vec<u8> {
#[cfg(not(any(target_os = "android", target_os = "ios")))]
if let Ok(id) = machine_uid::get() {
return id.into();
}
Config::get_key_pair().1
}
#[inline]
pub fn get_time() -> i64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis())
.unwrap_or(0) as _
}
#[inline]
pub fn is_ipv4_str(id: &str) -> bool {
regex::Regex::new(r"^\d+\.\d+\.\d+\.\d+(:\d+)?$")
.unwrap()
.is_match(id)
}
#[inline]
pub fn is_ipv6_str(id: &str) -> bool {
regex::Regex::new(r"^((([a-fA-F0-9]{1,4}:{1,2})+[a-fA-F0-9]{1,4})|(\[([a-fA-F0-9]{1,4}:{1,2})+[a-fA-F0-9]{1,4}\]:\d+))$")
.unwrap()
.is_match(id)
}
#[inline]
pub fn is_ip_str(id: &str) -> bool {
is_ipv4_str(id) || is_ipv6_str(id)
}
#[inline]
pub fn is_domain_port_str(id: &str) -> bool {
// modified regex for RFC1123 hostname. check https://stackoverflow.com/a/106223 for original version for hostname.
// according to [TLD List](https://data.iana.org/TLD/tlds-alpha-by-domain.txt) version 2023011700,
// there is no digits in TLD, and length is 2~63.
regex::Regex::new(
r"(?i)^([a-z0-9]([a-z0-9-]{0,61}[a-z0-9])?\.)+[a-z][a-z-]{0,61}[a-z]:\d{1,5}$",
)
.unwrap()
.is_match(id)
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_mangle() {
let addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(192, 168, 16, 32), 21116));
assert_eq!(addr, AddrMangle::decode(&AddrMangle::encode(addr)));
let addr = "[2001:db8::1]:8080".parse::<SocketAddr>().unwrap();
assert_eq!(addr, AddrMangle::decode(&AddrMangle::encode(addr)));
let addr = "[2001:db8:ff::1111]:80".parse::<SocketAddr>().unwrap();
assert_eq!(addr, AddrMangle::decode(&AddrMangle::encode(addr)));
}
#[test]
fn test_allow_err() {
allow_err!(Err("test err") as Result<(), &str>);
allow_err!(
Err("test err with msg") as Result<(), &str>,
"prompt {}",
"failed"
);
}
#[test]
fn test_ipv6() {
assert!(is_ipv6_str("1:2:3"));
assert!(is_ipv6_str("[ab:2:3]:12"));
assert!(is_ipv6_str("[ABEF:2a:3]:12"));
assert!(!is_ipv6_str("[ABEG:2a:3]:12"));
assert!(!is_ipv6_str("1[ab:2:3]:12"));
assert!(!is_ipv6_str("1.1.1.1"));
assert!(is_ip_str("1.1.1.1"));
assert!(!is_ipv6_str("1:2:"));
assert!(is_ipv6_str("1:2::0"));
assert!(is_ipv6_str("[1:2::0]:1"));
assert!(!is_ipv6_str("[1:2::0]:"));
assert!(!is_ipv6_str("1:2::0]:1"));
}
#[test]
fn test_hostname_port() {
assert!(!is_domain_port_str("a:12"));
assert!(!is_domain_port_str("a.b.c:12"));
assert!(is_domain_port_str("test.com:12"));
assert!(is_domain_port_str("test-UPPER.com:12"));
assert!(is_domain_port_str("some-other.domain.com:12"));
assert!(!is_domain_port_str("under_score:12"));
assert!(!is_domain_port_str("a@bc:12"));
assert!(!is_domain_port_str("1.1.1.1:12"));
assert!(!is_domain_port_str("1.2.3:12"));
assert!(!is_domain_port_str("1.2.3.45:12"));
assert!(!is_domain_port_str("a.b.c:123456"));
assert!(!is_domain_port_str("---:12"));
assert!(!is_domain_port_str(".:12"));
// todo: should we also check for these edge cases?
// out-of-range port
assert!(is_domain_port_str("test.com:0"));
assert!(is_domain_port_str("test.com:98989"));
}
#[test]
fn test_mangle2() {
let addr = "[::ffff:127.0.0.1]:8080".parse().unwrap();
let addr_v4 = "127.0.0.1:8080".parse().unwrap();
assert_eq!(AddrMangle::decode(&AddrMangle::encode(addr)), addr_v4);
assert_eq!(
AddrMangle::decode(&AddrMangle::encode("[::127.0.0.1]:8080".parse().unwrap())),
addr_v4
);
assert_eq!(AddrMangle::decode(&AddrMangle::encode(addr_v4)), addr_v4);
let addr_v6 = "[ef::fe]:8080".parse().unwrap();
assert_eq!(AddrMangle::decode(&AddrMangle::encode(addr_v6)), addr_v6);
let addr_v6 = "[::1]:8080".parse().unwrap();
assert_eq!(AddrMangle::decode(&AddrMangle::encode(addr_v6)), addr_v6);
}
}

View File

@ -1,242 +0,0 @@
use crate::config::Config;
use sodiumoxide::base64;
use std::sync::{Arc, RwLock};
lazy_static::lazy_static! {
pub static ref TEMPORARY_PASSWORD:Arc<RwLock<String>> = Arc::new(RwLock::new(Config::get_auto_password(temporary_password_length())));
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum VerificationMethod {
OnlyUseTemporaryPassword,
OnlyUsePermanentPassword,
UseBothPasswords,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ApproveMode {
Both,
Password,
Click,
}
// Should only be called in server
pub fn update_temporary_password() {
*TEMPORARY_PASSWORD.write().unwrap() = Config::get_auto_password(temporary_password_length());
}
// Should only be called in server
pub fn temporary_password() -> String {
TEMPORARY_PASSWORD.read().unwrap().clone()
}
fn verification_method() -> VerificationMethod {
let method = Config::get_option("verification-method");
if method == "use-temporary-password" {
VerificationMethod::OnlyUseTemporaryPassword
} else if method == "use-permanent-password" {
VerificationMethod::OnlyUsePermanentPassword
} else {
VerificationMethod::UseBothPasswords // default
}
}
pub fn temporary_password_length() -> usize {
let length = Config::get_option("temporary-password-length");
if length == "8" {
8
} else if length == "10" {
10
} else {
6 // default
}
}
pub fn temporary_enabled() -> bool {
verification_method() != VerificationMethod::OnlyUsePermanentPassword
}
pub fn permanent_enabled() -> bool {
verification_method() != VerificationMethod::OnlyUseTemporaryPassword
}
pub fn has_valid_password() -> bool {
temporary_enabled() && !temporary_password().is_empty()
|| permanent_enabled() && !Config::get_permanent_password().is_empty()
}
pub fn approve_mode() -> ApproveMode {
let mode = Config::get_option("approve-mode");
if mode == "password" {
ApproveMode::Password
} else if mode == "click" {
ApproveMode::Click
} else {
ApproveMode::Both
}
}
pub fn hide_cm() -> bool {
approve_mode() == ApproveMode::Password
&& verification_method() == VerificationMethod::OnlyUsePermanentPassword
&& !Config::get_option("allow-hide-cm").is_empty()
}
const VERSION_LEN: usize = 2;
pub fn encrypt_str_or_original(s: &str, version: &str) -> String {
if decrypt_str_or_original(s, version).1 {
log::error!("Duplicate encryption!");
return s.to_owned();
}
if version == "00" {
if let Ok(s) = encrypt(s.as_bytes()) {
return version.to_owned() + &s;
}
}
s.to_owned()
}
// String: password
// bool: whether decryption is successful
// bool: whether should store to re-encrypt when load
pub fn decrypt_str_or_original(s: &str, current_version: &str) -> (String, bool, bool) {
if s.len() > VERSION_LEN {
let version = &s[..VERSION_LEN];
if version == "00" {
if let Ok(v) = decrypt(s[VERSION_LEN..].as_bytes()) {
return (
String::from_utf8_lossy(&v).to_string(),
true,
version != current_version,
);
}
}
}
(s.to_owned(), false, !s.is_empty())
}
pub fn encrypt_vec_or_original(v: &[u8], version: &str) -> Vec<u8> {
if decrypt_vec_or_original(v, version).1 {
log::error!("Duplicate encryption!");
return v.to_owned();
}
if version == "00" {
if let Ok(s) = encrypt(v) {
let mut version = version.to_owned().into_bytes();
version.append(&mut s.into_bytes());
return version;
}
}
v.to_owned()
}
// Vec<u8>: password
// bool: whether decryption is successful
// bool: whether should store to re-encrypt when load
pub fn decrypt_vec_or_original(v: &[u8], current_version: &str) -> (Vec<u8>, bool, bool) {
if v.len() > VERSION_LEN {
let version = String::from_utf8_lossy(&v[..VERSION_LEN]);
if version == "00" {
if let Ok(v) = decrypt(&v[VERSION_LEN..]) {
return (v, true, version != current_version);
}
}
}
(v.to_owned(), false, !v.is_empty())
}
fn encrypt(v: &[u8]) -> Result<String, ()> {
if !v.is_empty() {
symmetric_crypt(v, true).map(|v| base64::encode(v, base64::Variant::Original))
} else {
Err(())
}
}
fn decrypt(v: &[u8]) -> Result<Vec<u8>, ()> {
if !v.is_empty() {
base64::decode(v, base64::Variant::Original).and_then(|v| symmetric_crypt(&v, false))
} else {
Err(())
}
}
fn symmetric_crypt(data: &[u8], encrypt: bool) -> Result<Vec<u8>, ()> {
use sodiumoxide::crypto::secretbox;
use std::convert::TryInto;
let mut keybuf = crate::get_uuid();
keybuf.resize(secretbox::KEYBYTES, 0);
let key = secretbox::Key(keybuf.try_into().map_err(|_| ())?);
let nonce = secretbox::Nonce([0; secretbox::NONCEBYTES]);
if encrypt {
Ok(secretbox::seal(data, &nonce, &key))
} else {
secretbox::open(data, &nonce, &key)
}
}
mod test {
#[test]
fn test() {
use super::*;
let version = "00";
println!("test str");
let data = "Hello World";
let encrypted = encrypt_str_or_original(data, version);
let (decrypted, succ, store) = decrypt_str_or_original(&encrypted, version);
println!("data: {data}");
println!("encrypted: {encrypted}");
println!("decrypted: {decrypted}");
assert_eq!(data, decrypted);
assert_eq!(version, &encrypted[..2]);
assert!(succ);
assert!(!store);
let (_, _, store) = decrypt_str_or_original(&encrypted, "99");
assert!(store);
assert!(!decrypt_str_or_original(&decrypted, version).1);
assert_eq!(encrypt_str_or_original(&encrypted, version), encrypted);
println!("test vec");
let data: Vec<u8> = vec![1, 2, 3, 4, 5, 6];
let encrypted = encrypt_vec_or_original(&data, version);
let (decrypted, succ, store) = decrypt_vec_or_original(&encrypted, version);
println!("data: {data:?}");
println!("encrypted: {encrypted:?}");
println!("decrypted: {decrypted:?}");
assert_eq!(data, decrypted);
assert_eq!(version.as_bytes(), &encrypted[..2]);
assert!(!store);
assert!(succ);
let (_, _, store) = decrypt_vec_or_original(&encrypted, "99");
assert!(store);
assert!(!decrypt_vec_or_original(&decrypted, version).1);
assert_eq!(encrypt_vec_or_original(&encrypted, version), encrypted);
println!("test original");
let data = version.to_string() + "Hello World";
let (decrypted, succ, store) = decrypt_str_or_original(&data, version);
assert_eq!(data, decrypted);
assert!(store);
assert!(!succ);
let verbytes = version.as_bytes();
let data: Vec<u8> = vec![verbytes[0], verbytes[1], 1, 2, 3, 4, 5, 6];
let (decrypted, succ, store) = decrypt_vec_or_original(&data, version);
assert_eq!(data, decrypted);
assert!(store);
assert!(!succ);
let (_, succ, store) = decrypt_str_or_original("", version);
assert!(!store);
assert!(!succ);
let (_, succ, store) = decrypt_vec_or_original(&[], version);
assert!(!store);
assert!(!succ);
}
}

View File

@ -1 +0,0 @@
include!(concat!(env!("OUT_DIR"), "/protos/mod.rs"));

View File

@ -1,135 +0,0 @@
use crate::{allow_err, anyhow::anyhow, ResultType};
use protobuf::Message;
use std::{net::SocketAddr, sync::Arc};
use tokio::{self, stream::StreamExt, sync::mpsc};
const QUIC_HBB: &[&[u8]] = &[b"hbb"];
const SERVER_NAME: &str = "hbb";
type Sender = mpsc::UnboundedSender<Value>;
type Receiver = mpsc::UnboundedReceiver<Value>;
pub fn new_server(socket: std::net::UdpSocket) -> ResultType<(Server, SocketAddr)> {
let mut transport_config = quinn::TransportConfig::default();
transport_config.stream_window_uni(0);
let mut server_config = quinn::ServerConfig::default();
server_config.transport = Arc::new(transport_config);
let mut server_config = quinn::ServerConfigBuilder::new(server_config);
server_config.protocols(QUIC_HBB);
// server_config.enable_keylog();
// server_config.use_stateless_retry(true);
let mut endpoint = quinn::Endpoint::builder();
endpoint.listen(server_config.build());
let (end, incoming) = endpoint.with_socket(socket)?;
Ok((Server { incoming }, end.local_addr()?))
}
pub async fn new_client(local_addr: &SocketAddr, peer: &SocketAddr) -> ResultType<Connection> {
let mut endpoint = quinn::Endpoint::builder();
let mut client_config = quinn::ClientConfigBuilder::default();
client_config.protocols(QUIC_HBB);
//client_config.enable_keylog();
endpoint.default_client_config(client_config.build());
let (endpoint, _) = endpoint.bind(local_addr)?;
let new_conn = endpoint.connect(peer, SERVER_NAME)?.await?;
Connection::new_for_client(new_conn.connection).await
}
pub struct Server {
incoming: quinn::Incoming,
}
impl Server {
#[inline]
pub async fn next(&mut self) -> ResultType<Option<Connection>> {
Connection::new_for_server(&mut self.incoming).await
}
}
pub struct Connection {
conn: quinn::Connection,
tx: quinn::SendStream,
rx: Receiver,
}
type Value = ResultType<Vec<u8>>;
impl Connection {
async fn new_for_server(incoming: &mut quinn::Incoming) -> ResultType<Option<Self>> {
if let Some(conn) = incoming.next().await {
let quinn::NewConnection {
connection: conn,
// uni_streams,
mut bi_streams,
..
} = conn.await?;
let (tx, rx) = mpsc::unbounded_channel::<Value>();
tokio::spawn(async move {
loop {
let stream = bi_streams.next().await;
if let Some(stream) = stream {
let stream = match stream {
Err(e) => {
tx.send(Err(e.into())).ok();
break;
}
Ok(s) => s,
};
let cloned = tx.clone();
tokio::spawn(async move {
allow_err!(handle_request(stream.1, cloned).await);
});
} else {
tx.send(Err(anyhow!("Reset by the peer"))).ok();
break;
}
}
log::info!("Exit connection outer loop");
});
let tx = conn.open_uni().await?;
Ok(Some(Self { conn, tx, rx }))
} else {
Ok(None)
}
}
async fn new_for_client(conn: quinn::Connection) -> ResultType<Self> {
let (tx, rx_quic) = conn.open_bi().await?;
let (tx_mpsc, rx) = mpsc::unbounded_channel::<Value>();
tokio::spawn(async move {
allow_err!(handle_request(rx_quic, tx_mpsc).await);
});
Ok(Self { conn, tx, rx })
}
#[inline]
pub async fn next(&mut self) -> Option<Value> {
// None is returned when all Sender halves have dropped,
// indicating that no further values can be sent on the channel.
self.rx.recv().await
}
#[inline]
pub fn remote_address(&self) -> SocketAddr {
self.conn.remote_address()
}
#[inline]
pub async fn send_raw(&mut self, bytes: &[u8]) -> ResultType<()> {
self.tx.write_all(bytes).await?;
Ok(())
}
#[inline]
pub async fn send(&mut self, msg: &dyn Message) -> ResultType<()> {
match msg.write_to_bytes() {
Ok(bytes) => self.send_raw(&bytes).await?,
err => allow_err!(err),
}
Ok(())
}
}
async fn handle_request(rx: quinn::RecvStream, tx: Sender) -> ResultType<()> {
Ok(())
}

View File

@ -1,281 +0,0 @@
use crate::{
config::{Config, NetworkType},
tcp::FramedStream,
udp::FramedSocket,
ResultType,
};
use anyhow::Context;
use std::net::SocketAddr;
use tokio::net::ToSocketAddrs;
use tokio_socks::{IntoTargetAddr, TargetAddr};
#[inline]
pub fn check_port<T: std::string::ToString>(host: T, port: i32) -> String {
let host = host.to_string();
if crate::is_ipv6_str(&host) {
if host.starts_with('[') {
return host;
}
return format!("[{host}]:{port}");
}
if !host.contains(':') {
return format!("{host}:{port}");
}
host
}
#[inline]
pub fn increase_port<T: std::string::ToString>(host: T, offset: i32) -> String {
let host = host.to_string();
if crate::is_ipv6_str(&host) {
if host.starts_with('[') {
let tmp: Vec<&str> = host.split("]:").collect();
if tmp.len() == 2 {
let port: i32 = tmp[1].parse().unwrap_or(0);
if port > 0 {
return format!("{}]:{}", tmp[0], port + offset);
}
}
}
} else if host.contains(':') {
let tmp: Vec<&str> = host.split(':').collect();
if tmp.len() == 2 {
let port: i32 = tmp[1].parse().unwrap_or(0);
if port > 0 {
return format!("{}:{}", tmp[0], port + offset);
}
}
}
host
}
pub fn test_if_valid_server(host: &str) -> String {
let host = check_port(host, 0);
use std::net::ToSocketAddrs;
match Config::get_network_type() {
NetworkType::Direct => match host.to_socket_addrs() {
Err(err) => err.to_string(),
Ok(_) => "".to_owned(),
},
NetworkType::ProxySocks => match &host.into_target_addr() {
Err(err) => err.to_string(),
Ok(_) => "".to_owned(),
},
}
}
pub trait IsResolvedSocketAddr {
fn resolve(&self) -> Option<&SocketAddr>;
}
impl IsResolvedSocketAddr for SocketAddr {
fn resolve(&self) -> Option<&SocketAddr> {
Some(self)
}
}
impl IsResolvedSocketAddr for String {
fn resolve(&self) -> Option<&SocketAddr> {
None
}
}
impl IsResolvedSocketAddr for &str {
fn resolve(&self) -> Option<&SocketAddr> {
None
}
}
#[inline]
pub async fn connect_tcp<
't,
T: IntoTargetAddr<'t> + ToSocketAddrs + IsResolvedSocketAddr + std::fmt::Display,
>(
target: T,
ms_timeout: u64,
) -> ResultType<FramedStream> {
connect_tcp_local(target, None, ms_timeout).await
}
pub async fn connect_tcp_local<
't,
T: IntoTargetAddr<'t> + ToSocketAddrs + IsResolvedSocketAddr + std::fmt::Display,
>(
target: T,
local: Option<SocketAddr>,
ms_timeout: u64,
) -> ResultType<FramedStream> {
if let Some(conf) = Config::get_socks() {
return FramedStream::connect(
conf.proxy.as_str(),
target,
local,
conf.username.as_str(),
conf.password.as_str(),
ms_timeout,
)
.await;
}
if let Some(target) = target.resolve() {
if let Some(local) = local {
if local.is_ipv6() && target.is_ipv4() {
let target = query_nip_io(target).await?;
return FramedStream::new(target, Some(local), ms_timeout).await;
}
}
}
FramedStream::new(target, local, ms_timeout).await
}
#[inline]
pub fn is_ipv4(target: &TargetAddr<'_>) -> bool {
match target {
TargetAddr::Ip(addr) => addr.is_ipv4(),
_ => true,
}
}
#[inline]
pub async fn query_nip_io(addr: &SocketAddr) -> ResultType<SocketAddr> {
tokio::net::lookup_host(format!("{}.nip.io:{}", addr.ip(), addr.port()))
.await?
.find(|x| x.is_ipv6())
.context("Failed to get ipv6 from nip.io")
}
#[inline]
pub fn ipv4_to_ipv6(addr: String, ipv4: bool) -> String {
if !ipv4 && crate::is_ipv4_str(&addr) {
if let Some(ip) = addr.split(':').next() {
return addr.replace(ip, &format!("{ip}.nip.io"));
}
}
addr
}
async fn test_target(target: &str) -> ResultType<SocketAddr> {
if let Ok(Ok(s)) = super::timeout(1000, tokio::net::TcpStream::connect(target)).await {
if let Ok(addr) = s.peer_addr() {
return Ok(addr);
}
}
tokio::net::lookup_host(target)
.await?
.next()
.context(format!("Failed to look up host for {target}"))
}
#[inline]
pub async fn new_udp_for(
target: &str,
ms_timeout: u64,
) -> ResultType<(FramedSocket, TargetAddr<'static>)> {
let (ipv4, target) = if NetworkType::Direct == Config::get_network_type() {
let addr = test_target(target).await?;
(addr.is_ipv4(), addr.into_target_addr()?)
} else {
(true, target.into_target_addr()?)
};
Ok((
new_udp(Config::get_any_listen_addr(ipv4), ms_timeout).await?,
target.to_owned(),
))
}
async fn new_udp<T: ToSocketAddrs>(local: T, ms_timeout: u64) -> ResultType<FramedSocket> {
match Config::get_socks() {
None => Ok(FramedSocket::new(local).await?),
Some(conf) => {
let socket = FramedSocket::new_proxy(
conf.proxy.as_str(),
local,
conf.username.as_str(),
conf.password.as_str(),
ms_timeout,
)
.await?;
Ok(socket)
}
}
}
pub async fn rebind_udp_for(
target: &str,
) -> ResultType<Option<(FramedSocket, TargetAddr<'static>)>> {
if Config::get_network_type() != NetworkType::Direct {
return Ok(None);
}
let addr = test_target(target).await?;
let v4 = addr.is_ipv4();
Ok(Some((
FramedSocket::new(Config::get_any_listen_addr(v4)).await?,
addr.into_target_addr()?.to_owned(),
)))
}
#[cfg(test)]
mod tests {
use std::net::ToSocketAddrs;
use super::*;
#[test]
fn test_nat64() {
test_nat64_async();
}
#[tokio::main(flavor = "current_thread")]
async fn test_nat64_async() {
assert_eq!(ipv4_to_ipv6("1.1.1.1".to_owned(), true), "1.1.1.1");
assert_eq!(ipv4_to_ipv6("1.1.1.1".to_owned(), false), "1.1.1.1.nip.io");
assert_eq!(
ipv4_to_ipv6("1.1.1.1:8080".to_owned(), false),
"1.1.1.1.nip.io:8080"
);
assert_eq!(
ipv4_to_ipv6("rustdesk.com".to_owned(), false),
"rustdesk.com"
);
if ("rustdesk.com:80")
.to_socket_addrs()
.unwrap()
.next()
.unwrap()
.is_ipv6()
{
assert!(query_nip_io(&"1.1.1.1:80".parse().unwrap())
.await
.unwrap()
.is_ipv6());
return;
}
assert!(query_nip_io(&"1.1.1.1:80".parse().unwrap()).await.is_err());
}
#[test]
fn test_test_if_valid_server() {
assert!(!test_if_valid_server("a").is_empty());
// on Linux, "1" is resolved to "0.0.0.1"
assert!(test_if_valid_server("1.1.1.1").is_empty());
assert!(test_if_valid_server("1.1.1.1:1").is_empty());
}
#[test]
fn test_check_port() {
assert_eq!(check_port("[1:2]:12", 32), "[1:2]:12");
assert_eq!(check_port("1:2", 32), "[1:2]:32");
assert_eq!(check_port("z1:2", 32), "z1:2");
assert_eq!(check_port("1.1.1.1", 32), "1.1.1.1:32");
assert_eq!(check_port("1.1.1.1:32", 32), "1.1.1.1:32");
assert_eq!(check_port("test.com:32", 0), "test.com:32");
assert_eq!(increase_port("[1:2]:12", 1), "[1:2]:13");
assert_eq!(increase_port("1.2.2.4:12", 1), "1.2.2.4:13");
assert_eq!(increase_port("1.2.2.4", 1), "1.2.2.4");
assert_eq!(increase_port("test.com", 1), "test.com");
assert_eq!(increase_port("test.com:13", 4), "test.com:17");
assert_eq!(increase_port("1:13", 4), "1:13");
assert_eq!(increase_port("22:1:13", 4), "22:1:13");
assert_eq!(increase_port("z1:2", 1), "z1:3");
}
}

View File

@ -1,335 +0,0 @@
use crate::{bail, bytes_codec::BytesCodec, ResultType};
use anyhow::Context as AnyhowCtx;
use bytes::{BufMut, Bytes, BytesMut};
use futures::{SinkExt, StreamExt};
use protobuf::Message;
use sodiumoxide::crypto::secretbox::{self, Key, Nonce};
use std::{
io::{self, Error, ErrorKind},
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
ops::{Deref, DerefMut},
pin::Pin,
task::{Context, Poll},
};
use tokio::{
io::{AsyncRead, AsyncWrite, ReadBuf},
net::{lookup_host, TcpListener, TcpSocket, ToSocketAddrs},
};
use tokio_socks::{tcp::Socks5Stream, IntoTargetAddr, ToProxyAddrs};
use tokio_util::codec::Framed;
pub trait TcpStreamTrait: AsyncRead + AsyncWrite + Unpin {}
pub struct DynTcpStream(Box<dyn TcpStreamTrait + Send + Sync>);
pub struct FramedStream(
Framed<DynTcpStream, BytesCodec>,
SocketAddr,
Option<(Key, u64, u64)>,
u64,
);
impl Deref for FramedStream {
type Target = Framed<DynTcpStream, BytesCodec>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for FramedStream {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl Deref for DynTcpStream {
type Target = Box<dyn TcpStreamTrait + Send + Sync>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for DynTcpStream {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
fn new_socket(addr: std::net::SocketAddr, reuse: bool) -> Result<TcpSocket, std::io::Error> {
let socket = match addr {
std::net::SocketAddr::V4(..) => TcpSocket::new_v4()?,
std::net::SocketAddr::V6(..) => TcpSocket::new_v6()?,
};
if reuse {
// windows has no reuse_port, but its reuse_address
// almost equals to unix's reuse_port + reuse_address,
// though may introduce nondeterministic behavior.
// illumos has no support for SO_REUSEPORT
#[cfg(all(unix, not(target_os = "illumos")))]
socket.set_reuseport(true)?;
socket.set_reuseaddr(true)?;
}
socket.bind(addr)?;
Ok(socket)
}
impl FramedStream {
pub async fn new<T: ToSocketAddrs + std::fmt::Display>(
remote_addr: T,
local_addr: Option<SocketAddr>,
ms_timeout: u64,
) -> ResultType<Self> {
for remote_addr in lookup_host(&remote_addr).await? {
let local = if let Some(addr) = local_addr {
addr
} else {
crate::config::Config::get_any_listen_addr(remote_addr.is_ipv4())
};
if let Ok(socket) = new_socket(local, true) {
if let Ok(Ok(stream)) =
super::timeout(ms_timeout, socket.connect(remote_addr)).await
{
stream.set_nodelay(true).ok();
let addr = stream.local_addr()?;
return Ok(Self(
Framed::new(DynTcpStream(Box::new(stream)), BytesCodec::new()),
addr,
None,
0,
));
}
}
}
bail!(format!("Failed to connect to {remote_addr}"));
}
pub async fn connect<'a, 't, P, T>(
proxy: P,
target: T,
local_addr: Option<SocketAddr>,
username: &'a str,
password: &'a str,
ms_timeout: u64,
) -> ResultType<Self>
where
P: ToProxyAddrs,
T: IntoTargetAddr<'t>,
{
if let Some(Ok(proxy)) = proxy.to_proxy_addrs().next().await {
let local = if let Some(addr) = local_addr {
addr
} else {
crate::config::Config::get_any_listen_addr(proxy.is_ipv4())
};
let stream =
super::timeout(ms_timeout, new_socket(local, true)?.connect(proxy)).await??;
stream.set_nodelay(true).ok();
let stream = if username.trim().is_empty() {
super::timeout(
ms_timeout,
Socks5Stream::connect_with_socket(stream, target),
)
.await??
} else {
super::timeout(
ms_timeout,
Socks5Stream::connect_with_password_and_socket(
stream, target, username, password,
),
)
.await??
};
let addr = stream.local_addr()?;
return Ok(Self(
Framed::new(DynTcpStream(Box::new(stream)), BytesCodec::new()),
addr,
None,
0,
));
}
bail!("could not resolve to any address");
}
pub fn local_addr(&self) -> SocketAddr {
self.1
}
pub fn set_send_timeout(&mut self, ms: u64) {
self.3 = ms;
}
pub fn from(stream: impl TcpStreamTrait + Send + Sync + 'static, addr: SocketAddr) -> Self {
Self(
Framed::new(DynTcpStream(Box::new(stream)), BytesCodec::new()),
addr,
None,
0,
)
}
pub fn set_raw(&mut self) {
self.0.codec_mut().set_raw();
self.2 = None;
}
pub fn is_secured(&self) -> bool {
self.2.is_some()
}
#[inline]
pub async fn send(&mut self, msg: &impl Message) -> ResultType<()> {
self.send_raw(msg.write_to_bytes()?).await
}
#[inline]
pub async fn send_raw(&mut self, msg: Vec<u8>) -> ResultType<()> {
let mut msg = msg;
if let Some(key) = self.2.as_mut() {
key.1 += 1;
let nonce = Self::get_nonce(key.1);
msg = secretbox::seal(&msg, &nonce, &key.0);
}
self.send_bytes(bytes::Bytes::from(msg)).await?;
Ok(())
}
#[inline]
pub async fn send_bytes(&mut self, bytes: Bytes) -> ResultType<()> {
if self.3 > 0 {
super::timeout(self.3, self.0.send(bytes)).await??;
} else {
self.0.send(bytes).await?;
}
Ok(())
}
#[inline]
pub async fn next(&mut self) -> Option<Result<BytesMut, Error>> {
let mut res = self.0.next().await;
if let Some(key) = self.2.as_mut() {
if let Some(Ok(bytes)) = res.as_mut() {
key.2 += 1;
let nonce = Self::get_nonce(key.2);
match secretbox::open(bytes, &nonce, &key.0) {
Ok(res) => {
bytes.clear();
bytes.put_slice(&res);
}
Err(()) => {
return Some(Err(Error::new(ErrorKind::Other, "decryption error")));
}
}
}
}
res
}
#[inline]
pub async fn next_timeout(&mut self, ms: u64) -> Option<Result<BytesMut, Error>> {
if let Ok(res) = super::timeout(ms, self.next()).await {
res
} else {
None
}
}
pub fn set_key(&mut self, key: Key) {
self.2 = Some((key, 0, 0));
}
fn get_nonce(seqnum: u64) -> Nonce {
let mut nonce = Nonce([0u8; secretbox::NONCEBYTES]);
nonce.0[..std::mem::size_of_val(&seqnum)].copy_from_slice(&seqnum.to_le_bytes());
nonce
}
}
const DEFAULT_BACKLOG: u32 = 128;
pub async fn new_listener<T: ToSocketAddrs>(addr: T, reuse: bool) -> ResultType<TcpListener> {
if !reuse {
Ok(TcpListener::bind(addr).await?)
} else {
let addr = lookup_host(&addr)
.await?
.next()
.context("could not resolve to any address")?;
new_socket(addr, true)?
.listen(DEFAULT_BACKLOG)
.map_err(anyhow::Error::msg)
}
}
pub async fn listen_any(port: u16, reuse: bool) -> ResultType<TcpListener> {
if let Ok(mut socket) = TcpSocket::new_v6() {
if reuse {
// windows has no reuse_port, but its reuse_address
// almost equals to unix's reuse_port + reuse_address,
// though may introduce nondeterministic behavior.
// illumos has no support for SO_REUSEPORT
#[cfg(all(unix, not(target_os = "illumos")))]
socket.set_reuseport(true).ok();
socket.set_reuseaddr(true).ok();
}
#[cfg(unix)]
{
use std::os::unix::io::{FromRawFd, IntoRawFd};
let raw_fd = socket.into_raw_fd();
let sock2 = unsafe { socket2::Socket::from_raw_fd(raw_fd) };
sock2.set_only_v6(false).ok();
socket = unsafe { TcpSocket::from_raw_fd(sock2.into_raw_fd()) };
}
#[cfg(windows)]
{
use std::os::windows::prelude::{FromRawSocket, IntoRawSocket};
let raw_socket = socket.into_raw_socket();
let sock2 = unsafe { socket2::Socket::from_raw_socket(raw_socket) };
sock2.set_only_v6(false).ok();
socket = unsafe { TcpSocket::from_raw_socket(sock2.into_raw_socket()) };
}
if socket
.bind(SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), port))
.is_ok()
{
if let Ok(l) = socket.listen(DEFAULT_BACKLOG) {
return Ok(l);
}
}
}
let s = TcpSocket::new_v4()?;
s.bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port))?;
Ok(s.listen(DEFAULT_BACKLOG)?)
}
impl Unpin for DynTcpStream {}
impl AsyncRead for DynTcpStream {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
AsyncRead::poll_read(Pin::new(&mut self.0), cx, buf)
}
}
impl AsyncWrite for DynTcpStream {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
AsyncWrite::poll_write(Pin::new(&mut self.0), cx, buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
AsyncWrite::poll_flush(Pin::new(&mut self.0), cx)
}
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
AsyncWrite::poll_shutdown(Pin::new(&mut self.0), cx)
}
}
impl<R: AsyncRead + AsyncWrite + Unpin> TcpStreamTrait for R {}

View File

@ -1,171 +0,0 @@
use crate::ResultType;
use anyhow::{anyhow, Context};
use bytes::{Bytes, BytesMut};
use futures::{SinkExt, StreamExt};
use protobuf::Message;
use socket2::{Domain, Socket, Type};
use std::net::SocketAddr;
use tokio::net::{lookup_host, ToSocketAddrs, UdpSocket};
use tokio_socks::{udp::Socks5UdpFramed, IntoTargetAddr, TargetAddr, ToProxyAddrs};
use tokio_util::{codec::BytesCodec, udp::UdpFramed};
pub enum FramedSocket {
Direct(UdpFramed<BytesCodec>),
ProxySocks(Socks5UdpFramed),
}
fn new_socket(addr: SocketAddr, reuse: bool, buf_size: usize) -> Result<Socket, std::io::Error> {
let socket = match addr {
SocketAddr::V4(..) => Socket::new(Domain::ipv4(), Type::dgram(), None),
SocketAddr::V6(..) => Socket::new(Domain::ipv6(), Type::dgram(), None),
}?;
if reuse {
// windows has no reuse_port, but its reuse_address
// almost equals to unix's reuse_port + reuse_address,
// though may introduce nondeterministic behavior.
// illumos has no support for SO_REUSEPORT
#[cfg(all(unix, not(target_os = "illumos")))]
socket.set_reuse_port(true)?;
socket.set_reuse_address(true)?;
}
// only nonblocking work with tokio, https://stackoverflow.com/questions/64649405/receiver-on-tokiompscchannel-only-receives-messages-when-buffer-is-full
socket.set_nonblocking(true)?;
if buf_size > 0 {
socket.set_recv_buffer_size(buf_size).ok();
}
log::info!(
"Receive buf size of udp {}: {:?}",
addr,
socket.recv_buffer_size()
);
if addr.is_ipv6() && addr.ip().is_unspecified() && addr.port() > 0 {
socket.set_only_v6(false).ok();
}
socket.bind(&addr.into())?;
Ok(socket)
}
impl FramedSocket {
pub async fn new<T: ToSocketAddrs>(addr: T) -> ResultType<Self> {
Self::new_reuse(addr, false, 0).await
}
pub async fn new_reuse<T: ToSocketAddrs>(
addr: T,
reuse: bool,
buf_size: usize,
) -> ResultType<Self> {
let addr = lookup_host(&addr)
.await?
.next()
.context("could not resolve to any address")?;
Ok(Self::Direct(UdpFramed::new(
UdpSocket::from_std(new_socket(addr, reuse, buf_size)?.into_udp_socket())?,
BytesCodec::new(),
)))
}
pub async fn new_proxy<'a, 't, P: ToProxyAddrs, T: ToSocketAddrs>(
proxy: P,
local: T,
username: &'a str,
password: &'a str,
ms_timeout: u64,
) -> ResultType<Self> {
let framed = if username.trim().is_empty() {
super::timeout(ms_timeout, Socks5UdpFramed::connect(proxy, Some(local))).await??
} else {
super::timeout(
ms_timeout,
Socks5UdpFramed::connect_with_password(proxy, Some(local), username, password),
)
.await??
};
log::trace!(
"Socks5 udp connected, local addr: {:?}, target addr: {}",
framed.local_addr(),
framed.socks_addr()
);
Ok(Self::ProxySocks(framed))
}
#[inline]
pub async fn send(
&mut self,
msg: &impl Message,
addr: impl IntoTargetAddr<'_>,
) -> ResultType<()> {
let addr = addr.into_target_addr()?.to_owned();
let send_data = Bytes::from(msg.write_to_bytes()?);
match self {
Self::Direct(f) => {
if let TargetAddr::Ip(addr) = addr {
f.send((send_data, addr)).await?
}
}
Self::ProxySocks(f) => f.send((send_data, addr)).await?,
};
Ok(())
}
// https://stackoverflow.com/a/68733302/1926020
#[inline]
pub async fn send_raw(
&mut self,
msg: &'static [u8],
addr: impl IntoTargetAddr<'static>,
) -> ResultType<()> {
let addr = addr.into_target_addr()?.to_owned();
match self {
Self::Direct(f) => {
if let TargetAddr::Ip(addr) = addr {
f.send((Bytes::from(msg), addr)).await?
}
}
Self::ProxySocks(f) => f.send((Bytes::from(msg), addr)).await?,
};
Ok(())
}
#[inline]
pub async fn next(&mut self) -> Option<ResultType<(BytesMut, TargetAddr<'static>)>> {
match self {
Self::Direct(f) => match f.next().await {
Some(Ok((data, addr))) => {
Some(Ok((data, addr.into_target_addr().ok()?.to_owned())))
}
Some(Err(e)) => Some(Err(anyhow!(e))),
None => None,
},
Self::ProxySocks(f) => match f.next().await {
Some(Ok((data, _))) => Some(Ok((data.data, data.dst_addr))),
Some(Err(e)) => Some(Err(anyhow!(e))),
None => None,
},
}
}
#[inline]
pub async fn next_timeout(
&mut self,
ms: u64,
) -> Option<ResultType<(BytesMut, TargetAddr<'static>)>> {
if let Ok(res) =
tokio::time::timeout(std::time::Duration::from_millis(ms), self.next()).await
{
res
} else {
None
}
}
pub fn local_addr(&self) -> Option<SocketAddr> {
if let FramedSocket::Direct(x) = self {
if let Ok(v) = x.get_ref().local_addr() {
return Some(v);
}
}
None
}
}

View File

@ -1,7 +1,6 @@
use clap::App;
use hbb_common::{
anyhow::{Context, Result},
log, ResultType,
allow_err, anyhow::{Context, Result}, get_version_number, log, tokio, ResultType
};
use ini::Ini;
use sodiumoxide::crypto::sign;
@ -189,3 +188,31 @@ pub async fn listen_signal() -> Result<()> {
let () = std::future::pending().await;
unreachable!();
}
pub fn check_software_update() {
const ONE_DAY_IN_SECONDS: u64 = 60 * 60 * 24;
std::thread::spawn(move || loop {
std::thread::spawn(move || allow_err!(check_software_update_()));
std::thread::sleep(std::time::Duration::from_secs(ONE_DAY_IN_SECONDS));
});
}
#[tokio::main(flavor = "current_thread")]
async fn check_software_update_() -> hbb_common::ResultType<()> {
let (request, url) = hbb_common::version_check_request(hbb_common::VER_TYPE_RUSTDESK_SERVER.to_string());
let latest_release_response = reqwest::Client::builder().build()?
.post(url)
.json(&request)
.send()
.await?;
let bytes = latest_release_response.bytes().await?;
let resp: hbb_common::VersionCheckResponse = serde_json::from_slice(&bytes)?;
let response_url = resp.url;
let latest_release_version = response_url.rsplit('/').next().unwrap_or_default();
if get_version_number(&latest_release_version) > get_version_number(crate::version::VERSION) {
log::info!("new version is available: {}", latest_release_version);
}
Ok(())
}

View File

@ -31,6 +31,7 @@ fn main() -> ResultType<()> {
}
let rmem = get_arg("rmem").parse::<usize>().unwrap_or(RMEM);
let serial: i32 = get_arg("serial").parse().unwrap_or(0);
crate::common::check_software_update();
RendezvousServer::start(port, serial, &get_arg_or("key", "-".to_owned()), rmem)?;
Ok(())
}

View File

@ -85,7 +85,7 @@ pub async fn start(port: &str, key: &str) -> ResultType<()> {
let main_task = async move {
loop {
log::info!("Start");
io_loop(listen_any(port, true).await?, listen_any(port2, true).await?, &key).await;
io_loop(listen_any(port).await?, listen_any(port2).await?, &key).await;
}
};
let listen_signal = crate::common::listen_signal();

View File

@ -1319,7 +1319,7 @@ async fn create_udp_listener(port: i32, rmem: usize) -> ResultType<FramedSocket>
#[inline]
async fn create_tcp_listener(port: i32) -> ResultType<TcpListener> {
let s = listen_any(port as _, true).await?;
let s = listen_any(port as _).await?;
log::debug!("listen on tcp {:?}", s.local_addr());
Ok(s)
}