Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 80 additions & 5 deletions src/executor/vsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,21 @@ pub(crate) enum VsockState {
ReceiveRequest,
Connected,
Connecting,
/// The peer sent a graceful `Op::Shutdown`. Buffered data may still be
/// read; once drained, reads report EOF.
Shutdown,
/// The peer (or the device) sent an abortive `Op::Rst`. Buffered data may
/// still be read; once drained, reads report `ECONNRESET`.
Reset,
}

pub(crate) const RAW_SOCKET_BUFFER_SIZE: usize = 256 * 1024;

/// Identifies an established connection by its local (listen) port and the
/// remote endpoint `(remote_cid, remote_port)`. Multiple connections may share
/// one local port, mirroring how TCP demultiplexes by the connection 4-tuple.
pub(crate) type ConnKey = (u32, u32, u32);

#[derive(Debug)]
pub(crate) struct RawSocket {
pub remote_cid: u32,
Expand Down Expand Up @@ -77,8 +87,20 @@ async fn vsock_run() {
let type_ = Type::try_from(header.type_.to_ne()).unwrap();
let mut vsock_guard = VSOCK_MAP.lock();
let header_cid: u32 = header.src_cid.to_ne().try_into().unwrap();
let remote_port = header.src_port.to_ne();

let Some(raw) = vsock_guard.get_mut_socket(port) else {
// Packets for an established connection address the local listen
// port but belong to a specific remote endpoint, so route them to
// the connection entry keyed by `(port, remote_cid, remote_port)`.
// `Op::Request` (and outbound-connect responses) have no such entry
// yet and fall back to the listener/connect socket in `port_map`.
let raw = if let Some(conn) =
vsock_guard.get_mut_connection((port, header_cid, remote_port))
{
conn
} else if let Some(s) = vsock_guard.get_mut_socket(port) {
s
} else {
return;
};

Expand Down Expand Up @@ -113,6 +135,16 @@ async fn vsock_run() {
} else if op == Op::Shutdown {
if raw.remote_cid == header_cid {
raw.state = VsockState::Shutdown;
raw.rx_waker.wake();
raw.tx_waker.wake();
} else {
trace!("Receive message from invalid source {header_cid}");
}
} else if op == Op::Rst {
if raw.remote_cid == header_cid {
raw.state = VsockState::Reset;
raw.rx_waker.wake();
raw.tx_waker.wake();
} else {
trace!("Receive message from invalid source {header_cid}");
}
Expand All @@ -123,6 +155,13 @@ async fn vsock_run() {
raw.peer_fwd_cnt = header.fwd_cnt.to_ne();
raw.tx_waker.wake();
}
} else if op == Op::Request {
// A connection request the listener cannot service right now
// (e.g. a previous request is still pending accept on this
// port). Reply with a reset so the peer fails fast instead of
// blocking until it times out.
hdr = Some(*header);
fwd_cnt = raw.fwd_cnt;
} else if raw.remote_cid == header_cid {
hdr = Some(*header);
fwd_cnt = raw.fwd_cnt;
Expand Down Expand Up @@ -161,13 +200,19 @@ async fn vsock_run() {
}

pub(crate) struct VsockMap {
/// Listeners (keyed by listen port) and outbound-connect sockets (keyed by
/// a synthetic ephemeral port).
port_map: BTreeMap<u32, RawSocket>,
/// Established inbound connections, keyed by `(local_port, remote_cid,
/// remote_port)`, so several connections can share one listen port.
conn_map: BTreeMap<ConnKey, RawSocket>,
}

impl VsockMap {
pub const fn new() -> Self {
Self {
port_map: BTreeMap::new(),
conn_map: BTreeMap::new(),
}
}

Expand Down Expand Up @@ -198,17 +243,47 @@ impl VsockMap {
Err(Errno::Badf)
}

pub fn get_socket(&self, port: u32) -> Option<&RawSocket> {
self.port_map.get(&port)
}

pub fn get_mut_socket(&mut self, port: u32) -> Option<&mut RawSocket> {
self.port_map.get_mut(&port)
}

pub fn get_mut_connection(&mut self, key: ConnKey) -> Option<&mut RawSocket> {
self.conn_map.get_mut(&key)
}

pub fn remove_socket(&mut self, port: u32) {
self.port_map.remove(&port);
}

pub fn remove_connection(&mut self, key: ConnKey) {
self.conn_map.remove(&key);
}

/// Move the pending connection on `listen_port` (in state `ReceiveRequest`)
/// into `conn_map` keyed by `(listen_port, remote_cid, remote_port)`, then
/// reset the listener entry to `Listen` so it can accept further
/// connections. Returns the new connection's key.
pub fn establish(&mut self, listen_port: u32) -> io::Result<ConnKey> {
let listener = self.port_map.get_mut(&listen_port).ok_or(Errno::Inval)?;
let key = (listen_port, listener.remote_cid, listener.remote_port);

// Build the connection entry from the negotiated handshake state, then
// reset the listener's fields in place. Resetting in place (rather than
// replacing the whole struct) preserves the listener's wakers, so an
// `accept()` future already parked on it is not lost.
let mut conn = RawSocket::new(VsockState::Connected);
conn.remote_cid = listener.remote_cid;
conn.remote_port = listener.remote_port;
conn.peer_buf_alloc = listener.peer_buf_alloc;

listener.state = VsockState::Listen;
listener.remote_cid = 0;
listener.remote_port = 0;
listener.peer_buf_alloc = 0;

self.conn_map.insert(key, conn);
Ok(key)
}
}

pub(crate) fn init() {
Expand Down
6 changes: 0 additions & 6 deletions src/fd/delegate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ pub(crate) enum Fd {
#[cfg(feature = "udp")]
UdpSocket(udp::Socket),
#[cfg(feature = "virtio-vsock")]
VsockNullSocket(vsock::NullSocket),
#[cfg(feature = "virtio-vsock")]
VsockSocket(vsock::Socket),
#[cfg(feature = "virtio-fs")]
VirtioFsFileHandle(VirtioFsFileHandle),
Expand Down Expand Up @@ -94,8 +92,6 @@ fd_from! {
#[cfg(feature = "udp")]
UdpSocket(udp::Socket),
#[cfg(feature = "virtio-vsock")]
VsockNullSocket(vsock::NullSocket),
#[cfg(feature = "virtio-vsock")]
VsockSocket(vsock::Socket),
#[cfg(feature = "virtio-fs")]
VirtioFsFileHandle(VirtioFsFileHandle),
Expand Down Expand Up @@ -128,8 +124,6 @@ impl ObjectInterface for Fd {
#[cfg(feature = "udp")]
Self::UdpSocket(fd) => fd,
#[cfg(feature = "virtio-vsock")]
Self::VsockNullSocket(fd) => fd,
#[cfg(feature = "virtio-vsock")]
Self::VsockSocket(fd) => fd,
#[cfg(feature = "virtio-fs")]
Self::VirtioFsFileHandle(fd) => fd,
Expand Down
Loading
Loading