peer_online_state: response online state bits

Signed-off-by: fufesou <shuanglongchen@yeah.net>
This commit is contained in:
fufesou 2022-07-27 00:35:40 +08:00
parent a4940f4634
commit f7fc45a3d2
2 changed files with 15 additions and 38 deletions

View File

@ -154,8 +154,7 @@ message OnlineRequest {
} }
message OnlineResponse { message OnlineResponse {
repeated string onlines = 1; bytes states = 1;
repeated string offlines = 2;
} }
message RendezvousMessage { message RendezvousMessage {

View File

@ -89,12 +89,7 @@ enum LoopFailure {
impl RendezvousServer { impl RendezvousServer {
#[tokio::main(flavor = "multi_thread")] #[tokio::main(flavor = "multi_thread")]
pub async fn start( pub async fn start(port: i32, serial: i32, key: &str, rmem: usize) -> ResultType<()> {
port: i32,
serial: i32,
key: &str,
rmem: usize,
) -> ResultType<()> {
let (key, sk) = Self::get_server_sk(key); let (key, sk) = Self::get_server_sk(key);
let addr = format!("0.0.0.0:{}", port); let addr = format!("0.0.0.0:{}", port);
let addr2 = format!("0.0.0.0:{}", port - 1); let addr2 = format!("0.0.0.0:{}", port - 1);
@ -762,27 +757,26 @@ impl RendezvousServer {
stream: &mut FramedStream, stream: &mut FramedStream,
peers: Vec<String>, peers: Vec<String>,
) -> ResultType<()> { ) -> ResultType<()> {
let mut onlines = Vec::new(); let mut states = BytesMut::zeroed((peers.len() + 7) / 8);
for peer_id in &peers { for i in 0..peers.len() {
let peer_id = &peers[i];
// bytes index from left to right
let states_idx = i / 8;
let bit_idx = 7 - i % 8;
if let Some(peer) = self.pm.get_in_memory(&peer_id).await { if let Some(peer) = self.pm.get_in_memory(&peer_id).await {
let (elapsed, _) = { let (elapsed, _) = {
let r = peer.read().await; let r = peer.read().await;
(r.last_reg_time.elapsed().as_millis() as i32, r.socket_addr) (r.last_reg_time.elapsed().as_millis() as i32, r.socket_addr)
}; };
if elapsed < REG_TIMEOUT { if elapsed < REG_TIMEOUT {
onlines.push(peer_id.clone()); states[states_idx] |= 0x01 << bit_idx;
} }
} }
} }
let offlines = peers
.into_iter()
.filter(|p| onlines.iter().position(|r| r == p).is_none())
.collect::<Vec<_>>();
let mut msg_out = RendezvousMessage::new(); let mut msg_out = RendezvousMessage::new();
msg_out.set_online_response(OnlineResponse { msg_out.set_online_response(OnlineResponse {
onlines, states: states.into(),
offlines,
..Default::default() ..Default::default()
}); });
stream.send(&msg_out).await?; stream.send(&msg_out).await?;
@ -1079,29 +1073,19 @@ impl RendezvousServer {
Some(rendezvous_message::Union::OnlineRequest(or)) => { Some(rendezvous_message::Union::OnlineRequest(or)) => {
allow_err!(rs.handle_online_request(&mut stream, or.peers).await); allow_err!(rs.handle_online_request(&mut stream, or.peers).await);
} }
_ => { _ => {}
}
} }
} }
} }
}); });
} }
async fn handle_listener( async fn handle_listener(&self, stream: TcpStream, addr: SocketAddr, key: &str, ws: bool) {
&self,
stream: TcpStream,
addr: SocketAddr,
key: &str,
ws: bool,
) {
log::debug!("Tcp connection from {:?}, ws: {}", addr, ws); log::debug!("Tcp connection from {:?}, ws: {}", addr, ws);
let mut rs = self.clone(); let mut rs = self.clone();
let key = key.to_owned(); let key = key.to_owned();
tokio::spawn(async move { tokio::spawn(async move {
allow_err!( allow_err!(rs.handle_listener_inner(stream, addr, &key, ws).await);
rs.handle_listener_inner(stream, addr, &key, ws)
.await
);
}); });
} }
@ -1121,10 +1105,7 @@ impl RendezvousServer {
while let Ok(Some(Ok(msg))) = timeout(30_000, b.next()).await { while let Ok(Some(Ok(msg))) = timeout(30_000, b.next()).await {
match msg { match msg {
tungstenite::Message::Binary(bytes) => { tungstenite::Message::Binary(bytes) => {
if !self if !self.handle_tcp(&bytes, &mut sink, addr, key, ws).await {
.handle_tcp(&bytes, &mut sink, addr, key, ws)
.await
{
break; break;
} }
} }
@ -1135,10 +1116,7 @@ impl RendezvousServer {
let (a, mut b) = Framed::new(stream, BytesCodec::new()).split(); let (a, mut b) = Framed::new(stream, BytesCodec::new()).split();
sink = Some(Sink::TcpStream(a)); sink = Some(Sink::TcpStream(a));
while let Ok(Some(Ok(bytes))) = timeout(30_000, b.next()).await { while let Ok(Some(Ok(bytes))) = timeout(30_000, b.next()).await {
if !self if !self.handle_tcp(&bytes, &mut sink, addr, key, ws).await {
.handle_tcp(&bytes, &mut sink, addr, key, ws)
.await
{
break; break;
} }
} }