I applied what I could, and squashed a number patches, including those from "[PATCH 0/6] More migration improvements". I didn't test the full flow here. David Gibson (2): migrate: Make more handling common rather than vhost-user specific migrate: Don't handle the migration channel through epoll Stefano Brivio (6): flow_table: Use size in extern declaration for flowtab, export hash table Introduce facilities for guest migration on top of vhost-user infrastructure Add interfaces and configuration bits for passt-repair flow, tcp: Basic pre-migration source handler to dump sequence numbers vhost_user: Make source quit after reporting migration state Implement target side of migration Makefile | 14 +-- conf.c | 44 ++++++- epoll_type.h | 6 +- flow.c | 104 ++++++++++++++++- flow.h | 4 + flow_table.h | 5 +- migrate.c | 319 +++++++++++++++++++++++++++++++++++++++++++++++++++ migrate.h | 96 ++++++++++++++++ passt.1 | 11 ++ passt.c | 17 ++- passt.h | 17 +++ repair.c | 193 +++++++++++++++++++++++++++++++ repair.h | 16 +++ tap.c | 65 +---------- tcp.c | 170 +++++++++++++++++++++++++++ tcp_conn.h | 7 ++ util.c | 62 ++++++++++ util.h | 1 + vhost_user.c | 66 +++-------- virtio.h | 4 - vu_common.c | 49 +------- vu_common.h | 2 +- 22 files changed, 1087 insertions(+), 185 deletions(-) create mode 100644 migrate.c create mode 100644 migrate.h create mode 100644 repair.c create mode 100644 repair.h -- 2.43.0
This can all be thrown away once we stop copying the flow table. Signed-off-by: Stefano Brivio <sbrivio(a)redhat.com> --- flow.c | 6 +----- flow_table.h | 5 ++++- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/flow.c b/flow.c index a6fe6d1..3b8dd0e 100644 --- a/flow.c +++ b/flow.c @@ -109,12 +109,8 @@ unsigned flow_first_free; union flow flowtab[FLOW_MAX]; static const union flow *flow_new_entry; /* = NULL */ -/* Hash table to index it */ -#define FLOW_HASH_LOAD 70 /* % */ -#define FLOW_HASH_SIZE ((2 * FLOW_MAX * 100 / FLOW_HASH_LOAD)) - /* Table for lookup from flowside information */ -static flow_sidx_t flow_hashtab[FLOW_HASH_SIZE]; +flow_sidx_t flow_hashtab[FLOW_HASH_SIZE]; static_assert(ARRAY_SIZE(flow_hashtab) >= 2 * FLOW_MAX, "Safe linear probing requires hash table with more entries than the number of sides in the flow table"); diff --git a/flow_table.h b/flow_table.h index eeb6f41..be4f841 100644 --- a/flow_table.h +++ b/flow_table.h @@ -41,7 +41,10 @@ union flow { /* Global Flow Table */ extern unsigned flow_first_free; -extern union flow flowtab[]; +extern union flow flowtab[FLOW_MAX]; +#define FLOW_HASH_LOAD 70 /* % */ +#define FLOW_HASH_SIZE ((2 * FLOW_MAX * 100 / FLOW_HASH_LOAD)) +extern flow_sidx_t flow_hashtab[FLOW_HASH_SIZE]; /** * flow_foreach_sidei() - 'for' type macro to step through each side of flow -- 2.43.0
Add two sets (source or target) of three functions each for passt in vhost-user mode, triggered by activity on the file descriptor passed via VHOST_USER_PROTOCOL_F_DEVICE_STATE: - migrate_source_pre() and migrate_target_pre() are called to prepare for migration, before data is transferred - migrate_source() sends, and migrate_target() receives migration data - migrate_source_post() and migrate_target_post() are responsible for any post-migration task Callbacks are added to these functions with arrays of function pointers in migrate.c. Migration handlers are versioned. Versioned descriptions of data sections will be added to the data_versions array, which points to versioned iovec arrays. Version 1 is currently empty and will be filled in in subsequent patches. The source announces the data version to be used and informs the peer about endianness, and the size of void *, time_t, flow entries and flow hash table entries. The target checks if the version of the source is still supported. If it's not, it aborts the migration. From David: vu_migrate() moves to migrate.c now. On top of this, from David: -- migrate: Handle sending header section from data sections Currently the global state header is included in the list of data sections to send for migration. This makes for an asymmetry between the source and target sides: for the source, the header is sent after the 'pre' handlers along with all the rest of the data. For the target side, the header must be read first (before the 'pre' handlers), so that we know the version which determines what all the rest of the data will be. Change this so that the header is handled explicitly on both the source and target side. This will make some future changes simpler as well. -- migrate: Make migration handlers simpler and more flexible Currently the structure of the migration callbacks is quite rigid. There are separate tables for pre and post handlers, for source and target. Those only prepare or fix up with no reading or writing of the stream. The actual reading and writing is done with an iovec of buffers to transfer. That can't handle any variably sized structures, nor sending state which is only obtained during migration rather than being tracked usually. Replace both the handlers and the sections with an ordered list of migration "stages". Each stage has a callback for both source and target side doing whatever is necessary - these can be NULL, for example for preparation steps that have no equivalent on the other side. Things that are just buffers to be transferred are handled with a macro generating a suitable stage entry. -- Signed-off-by: Stefano Brivio <sbrivio(a)redhat.com> --- Makefile | 12 +-- migrate.c | 250 ++++++++++++++++++++++++++++++++++++++++++++++++++++ migrate.h | 94 ++++++++++++++++++++ passt.c | 2 +- vu_common.c | 58 +++++------- vu_common.h | 2 +- 6 files changed, 372 insertions(+), 46 deletions(-) create mode 100644 migrate.c create mode 100644 migrate.h diff --git a/Makefile b/Makefile index 6ab8d24..f9e8a3c 100644 --- a/Makefile +++ b/Makefile @@ -38,8 +38,8 @@ FLAGS += -DDUAL_STACK_SOCKETS=$(DUAL_STACK_SOCKETS) PASST_SRCS = arch.c arp.c checksum.c conf.c dhcp.c dhcpv6.c flow.c fwd.c \ icmp.c igmp.c inany.c iov.c ip.c isolation.c lineread.c log.c mld.c \ - ndp.c netlink.c packet.c passt.c pasta.c pcap.c pif.c tap.c tcp.c \ - tcp_buf.c tcp_splice.c tcp_vu.c udp.c udp_flow.c udp_vu.c util.c \ + ndp.c netlink.c migrate.c packet.c passt.c pasta.c pcap.c pif.c tap.c \ + tcp.c tcp_buf.c tcp_splice.c tcp_vu.c udp.c udp_flow.c udp_vu.c util.c \ vhost_user.c virtio.c vu_common.c QRAP_SRCS = qrap.c PASST_REPAIR_SRCS = passt-repair.c @@ -49,10 +49,10 @@ MANPAGES = passt.1 pasta.1 qrap.1 passt-repair.1 PASST_HEADERS = arch.h arp.h checksum.h conf.h dhcp.h dhcpv6.h flow.h fwd.h \ flow_table.h icmp.h icmp_flow.h inany.h iov.h ip.h isolation.h \ - lineread.h log.h ndp.h netlink.h packet.h passt.h pasta.h pcap.h pif.h \ - siphash.h tap.h tcp.h tcp_buf.h tcp_conn.h tcp_internal.h tcp_splice.h \ - tcp_vu.h udp.h udp_flow.h udp_internal.h udp_vu.h util.h vhost_user.h \ - virtio.h vu_common.h + lineread.h log.h migrate.h ndp.h netlink.h packet.h passt.h pasta.h \ + pcap.h pif.h siphash.h tap.h tcp.h tcp_buf.h tcp_conn.h tcp_internal.h \ + tcp_splice.h tcp_vu.h udp.h udp_flow.h udp_internal.h udp_vu.h util.h \ + vhost_user.h virtio.h vu_common.h HEADERS = $(PASST_HEADERS) seccomp.h C := \#include <sys/random.h>\nint main(){int a=getrandom(0, 0, 0);} diff --git a/migrate.c b/migrate.c new file mode 100644 index 0000000..a4b8a1d --- /dev/null +++ b/migrate.c @@ -0,0 +1,250 @@ +// SPDX-License-Identifier: GPL-2.0-or-later + +/* PASST - Plug A Simple Socket Transport + * for qemu/UNIX domain socket mode + * + * PASTA - Pack A Subtle Tap Abstraction + * for network namespace/tap device mode + * + * migrate.c - Migration sections, layout, and routines + * + * Copyright (c) 2025 Red Hat GmbH + * Author: Stefano Brivio <sbrivio(a)redhat.com> + */ + +#include <errno.h> +#include <sys/uio.h> + +#include "util.h" +#include "ip.h" +#include "passt.h" +#include "inany.h" +#include "flow.h" +#include "flow_table.h" + +#include "migrate.h" + +/* Current version of migration data */ +#define MIGRATE_VERSION 1 + +/* Magic as we see it and as seen with reverse endianness */ +#define MIGRATE_MAGIC 0xB1BB1D1B0BB1D1B0 +#define MIGRATE_MAGIC_SWAPPED 0xB0D1B1B01B1DBBB1 + +/* Migration header to send from source */ +static union migrate_header header = { + /* Immutable part of header structure: keep these two sections at the + * beginning, because they are enough to identify a version regardless + * of metadata. + */ + .magic = MIGRATE_MAGIC, + .version = htonl_constant(MIGRATE_VERSION), + /* End of immutable part of header structure */ + + .time_t_size = htonl_constant(sizeof(time_t)), + .flow_size = htonl_constant(sizeof(union flow)), + .flow_sidx_size = htonl_constant(sizeof(struct flow_sidx)), + .voidp_size = htonl_constant(sizeof(void *)), +}; + +/** + * migrate_send_block() - Migration stage handler to send verbatim data + * @c: Execution context + * @m: Migration metadata + * @stage: Migration stage + * @fd: Migration fd + * + * Sends the buffer in @stage->iov over the migration channel. + */ +static int migrate_send_block(struct ctx *c, struct migrate_meta *m, + const struct migrate_stage *stage, int fd) +{ + (void)c; + (void)m; + + if (write_remainder(fd, &stage->iov, 1, 0) < 0) + return errno; + + return 0; +} + +/** + * migrate_recv_block() - Migration stage handler to receive verbatim data + * @c: Execution context + * @m: Migration metadata + * @stage: Migration stage + * @fd: Migration fd + * + * Reads the buffer in @stage->iov from the migration channel. + * + * #syscalls:vu readv + */ +static int migrate_recv_block(struct ctx *c, struct migrate_meta *m, + const struct migrate_stage *stage, int fd) +{ + (void)c; + (void)m; + + if (read_remainder(fd, &stage->iov, 1, 0) < 0) + return errno; + + return 0; +} + +#define DATA_STAGE(v) \ + { \ + .name = #v, \ + .source = migrate_send_block, \ + .target = migrate_recv_block, \ + .iov = { &(v), sizeof(v) }, \ + } + +/* Stages for version 1 */ +static const struct migrate_stage stages_v1[] = { + { + .name = "flow pre", + .target = NULL, + }, + DATA_STAGE(flow_first_free), + DATA_STAGE(flowtab), + DATA_STAGE(flow_hashtab), + { + .name = "flow post", + .source = NULL, + }, +}; + +/* Set of data versions */ +static const struct migrate_version versions[] = { + { + 1, stages_v1, ARRAY_SIZE(stages_v1), + }, +}; + +/** + * migrate_source() - Migration as source, send state to hypervisor + * @c: Execution context + * @fd: File descriptor for state transfer + * + * Return: 0 on success, positive error code on failure + */ +int migrate_source(struct ctx *c, int fd) +{ + struct migrate_meta m; + unsigned i; + int rc; + + rc = write_all_buf(fd, &header, sizeof(header)); + if (rc) { + err("Failed to send migration header: %s, abort", strerror_(rc)); + return rc; + } + + for (i = 0; i < ARRAY_SIZE(stages_v1); i++) { + const struct migrate_stage *stage = &stages_v1[i]; + + if (!stage->source) + continue; + + debug("Source side migration: %s", stage->name); + + if ((rc = stage->source(c, &m, stage, fd))) { + err("Source migration failed stage %s: %s, abort", + stage->name, strerror_(rc)); + return rc; + } + } + + return 0; +} + +/** + * migrate_target_read_header() - Set metadata in target from source header + * @fd: Descriptor for state transfer + * @m: Migration metadata, filled on return + * + * Return: 0 on success, error code on failure + */ +static int migrate_target_read_header(int fd, struct migrate_meta *m) +{ + union migrate_header h; + unsigned i; + + if (read_all_buf(fd, &h, sizeof(h))) + return errno; + + debug("Source magic: 0x%016" PRIx64 ", sizeof(void *): %u, version: %u", + h.magic, ntohl(h.voidp_size), ntohl(h.version)); + + m->version = ntohl(h.version); + m->v = NULL; + for (i = 0; i < ARRAY_SIZE(versions); i++) { + if (versions[i].version == m->version) + m->v = &versions[i]; + } + if (!m->v) + return ENOTSUP; + + if (h.magic == MIGRATE_MAGIC) + m->bswap = false; + else if (h.magic == MIGRATE_MAGIC_SWAPPED) + m->bswap = true; + else + return ENOTSUP; + + if (ntohl(h.voidp_size) == 4) + m->source_64b = false; + else if (ntohl(h.voidp_size) == 8) + m->source_64b = true; + else + return ENOTSUP; + + if (ntohl(h.time_t_size) == 4) + m->time_64b = false; + else if (ntohl(h.time_t_size) == 8) + m->time_64b = true; + else + return ENOTSUP; + + m->flow_size = ntohl(h.flow_size); + m->flow_sidx_size = ntohl(h.flow_sidx_size); + + return 0; +} + +/** + * migrate_target() - Migration as target, receive state from hypervisor + * @c: Execution context + * @fd: File descriptor for state transfer + * + * Return: 0 on success, positive error code on failure + */ +int migrate_target(struct ctx *c, int fd) +{ + struct migrate_meta m; + unsigned i; + int rc; + + rc = migrate_target_read_header(fd, &m); + if (rc) { + err("Migration header check failed: %s, abort", strerror_(rc)); + return rc; + } + + for (i = 0; i < m.v->nstages; i++) { + const struct migrate_stage *stage = &m.v->stages[i]; + + if (!stage->target) + continue; + + debug("Target side migration: %s", stage->name); + + if ((rc = stage->target(c, &m, stage, fd))) { + err("Target migration failed stage %s: %s, abort", + stage->name, strerror_(rc)); + return rc; + } + } + + return 0; +} diff --git a/migrate.h b/migrate.h new file mode 100644 index 0000000..793d5e5 --- /dev/null +++ b/migrate.h @@ -0,0 +1,94 @@ +/* SPDX-License-Identifier: GPL-2.0-or-later + * Copyright (c) 2025 Red Hat GmbH + * Author: Stefano Brivio <sbrivio(a)redhat.com> + */ + +#ifndef MIGRATE_H +#define MIGRATE_H + +struct migrate_version; + +/** + * struct migrate_meta - Migration metadata + * @version: Chosen migration data version, host order + * @v: Migration version information + * @bswap: Source has opposite endianness + * @peer_64b: Source uses 64-bit void * + * @time_64b: Source uses 64-bit time_t + * @flow_size: Size of union flow in source + * @flow_sidx_size: Size of struct flow_sidx in source + */ +struct migrate_meta { + uint32_t version; + const struct migrate_version *v; + bool bswap; + bool source_64b; + bool time_64b; + size_t flow_size; + size_t flow_sidx_size; +}; + +/** + * union migrate_header - Migration header from source + * @magic: 0xB1BB1D1B0BB1D1B0, host order + * @version: Source sends highest known, target aborts if unsupported + * @voidp_size: sizeof(void *), network order + * @time_t_size: sizeof(time_t), network order + * @flow_size: sizeof(union flow), network order + * @flow_sidx_size: sizeof(struct flow_sidx_t), network order + * @unused: Go figure + */ +union migrate_header { + struct { + uint64_t magic; + uint32_t version; + uint32_t voidp_size; + uint32_t time_t_size; + uint32_t flow_size; + uint32_t flow_sidx_size; + }; + uint8_t unused[4096]; +} __attribute__((packed)); + +struct migrate_stage; + +/** + * migrate_cb_t - Callback function to implement one migration stage + */ +typedef int (*migrate_cb_t)(struct ctx *c, struct migrate_meta *m, + const struct migrate_stage *stage, int fd); + +/** + * struct migrate_stage - Callbacks and parameters for one stage of migration + * @name: Stage name (for debugging) + * @source: Callback to implement this stage on the source + * @target: Callback to implement this stage on the target + * @v: Source version this applies to, host order + * @sections: Array of data sections, NULL-terminated + */ +struct migrate_stage { + const char *name; + migrate_cb_t source; + migrate_cb_t target; + /* FIXME: rollback callbacks? */ + union { + struct iovec iov; + }; +}; + +/** + * struct migrate_version - Stages for a particular protocol version + * @version: Version number this implements + * @stages: Ordered array of stages + * @nstages: Length of @stages + */ +struct migrate_version { + uint32_t version; + const struct migrate_stage *stages; + unsigned nstages; +}; + +int migrate_source(struct ctx *c, int fd); +int migrate_target(struct ctx *c, int fd); + +#endif /* MIGRATE_H */ diff --git a/passt.c b/passt.c index b1c8ab6..184d4e5 100644 --- a/passt.c +++ b/passt.c @@ -358,7 +358,7 @@ loop: vu_kick_cb(c.vdev, ref, &now); break; case EPOLL_TYPE_VHOST_MIGRATION: - vu_migrate(c.vdev, eventmask); + vu_migrate(&c, eventmask); break; default: /* Can't happen */ diff --git a/vu_common.c b/vu_common.c index ab04d31..3d41824 100644 --- a/vu_common.c +++ b/vu_common.c @@ -5,6 +5,7 @@ * common_vu.c - vhost-user common UDP and TCP functions */ +#include <errno.h> #include <unistd.h> #include <sys/uio.h> #include <sys/eventfd.h> @@ -17,6 +18,7 @@ #include "vhost_user.h" #include "pcap.h" #include "vu_common.h" +#include "migrate.h" #define VU_MAX_TX_BUFFER_NB 2 @@ -305,48 +307,28 @@ err: } /** - * vu_migrate() - Send/receive passt insternal state to/from QEMU - * @vdev: vhost-user device + * vu_migrate() - Send/receive passt internal state to/from QEMU + * @c: Execution context * @events: epoll events */ -void vu_migrate(struct vu_dev *vdev, uint32_t events) +void vu_migrate(struct ctx *c, uint32_t events) { - int ret; + struct vu_dev *vdev = c->vdev; + int rc = EIO; - /* TODO: collect/set passt internal state - * and use vdev->device_state_fd to send/receive it - */ debug("vu_migrate fd %d events %x", vdev->device_state_fd, events); - if (events & EPOLLOUT) { - debug("Saving backend state"); - - /* send some stuff */ - ret = write(vdev->device_state_fd, "PASST", 6); - /* value to be returned by VHOST_USER_CHECK_DEVICE_STATE */ - vdev->device_state_result = ret == -1 ? -1 : 0; - /* Closing the file descriptor signals the end of transfer */ - epoll_del(vdev->context, vdev->device_state_fd); - close(vdev->device_state_fd); - vdev->device_state_fd = -1; - } else if (events & EPOLLIN) { - char buf[6]; - - debug("Loading backend state"); - /* read some stuff */ - ret = read(vdev->device_state_fd, buf, sizeof(buf)); - /* value to be returned by VHOST_USER_CHECK_DEVICE_STATE */ - if (ret != sizeof(buf)) { - vdev->device_state_result = -1; - } else { - ret = strncmp(buf, "PASST", sizeof(buf)); - vdev->device_state_result = ret == 0 ? 0 : -1; - } - } else if (events & EPOLLHUP) { - debug("Closing migration channel"); - /* The end of file signals the end of the transfer. */ - epoll_del(vdev->context, vdev->device_state_fd); - close(vdev->device_state_fd); - vdev->device_state_fd = -1; - } + if (events & EPOLLOUT) + rc = migrate_source(c, vdev->device_state_fd); + else if (events & EPOLLIN) + rc = migrate_target(c, vdev->device_state_fd); + + /* EPOLLHUP without EPOLLIN/EPOLLOUT, or EPOLLERR? Migration failed */ + + vdev->device_state_result = rc; + + epoll_ctl(c->epollfd, EPOLL_CTL_DEL, vdev->device_state_fd, NULL); + debug("Closing migration channel"); + close(vdev->device_state_fd); + vdev->device_state_fd = -1; } diff --git a/vu_common.h b/vu_common.h index d56c021..69c4006 100644 --- a/vu_common.h +++ b/vu_common.h @@ -57,5 +57,5 @@ void vu_flush(const struct vu_dev *vdev, struct vu_virtq *vq, void vu_kick_cb(struct vu_dev *vdev, union epoll_ref ref, const struct timespec *now); int vu_send_single(const struct ctx *c, const void *buf, size_t size); -void vu_migrate(struct vu_dev *vdev, uint32_t events); +void vu_migrate(struct ctx *c, uint32_t events); #endif /* VU_COMMON_H */ -- 2.43.0
From: David Gibson <david(a)gibson.dropbear.id.au> A lot of the migration logic in vhost_user.c and vu_common.c isn't really specific to vhost-user, but matches the overall structure of migration. This applies to vu_migrate() and to the parts of of vu_set_device_state_fd_exec() which aren't related to parsing the specific vhost-user control request. Move this logic to migrate.c, with matching renames. Signed-off-by: David Gibson <david(a)gibson.dropbear.id.au> --- epoll_type.h | 4 +-- migrate.c | 92 ++++++++++++++++++++++++++++++++++++++++++++++++++-- migrate.h | 6 ++-- passt.c | 6 ++-- passt.h | 6 ++++ vhost_user.c | 58 ++++----------------------------- virtio.h | 4 --- vu_common.c | 27 --------------- vu_common.h | 2 +- 9 files changed, 113 insertions(+), 92 deletions(-) diff --git a/epoll_type.h b/epoll_type.h index fd9eac3..b1d3a1d 100644 --- a/epoll_type.h +++ b/epoll_type.h @@ -40,8 +40,8 @@ enum epoll_type { EPOLL_TYPE_VHOST_CMD, /* vhost-user kick event socket */ EPOLL_TYPE_VHOST_KICK, - /* vhost-user migration socket */ - EPOLL_TYPE_VHOST_MIGRATION, + /* migration device state channel */ + EPOLL_TYPE_DEVICE_STATE, EPOLL_NUM_TYPES, }; diff --git a/migrate.c b/migrate.c index a4b8a1d..b7e5431 100644 --- a/migrate.c +++ b/migrate.c @@ -21,6 +21,7 @@ #include "inany.h" #include "flow.h" #include "flow_table.h" +#include "repair.h" #include "migrate.h" @@ -128,7 +129,7 @@ static const struct migrate_version versions[] = { * * Return: 0 on success, positive error code on failure */ -int migrate_source(struct ctx *c, int fd) +static int migrate_source(struct ctx *c, int fd) { struct migrate_meta m; unsigned i; @@ -219,7 +220,7 @@ static int migrate_target_read_header(int fd, struct migrate_meta *m) * * Return: 0 on success, positive error code on failure */ -int migrate_target(struct ctx *c, int fd) +static int migrate_target(struct ctx *c, int fd) { struct migrate_meta m; unsigned i; @@ -248,3 +249,90 @@ int migrate_target(struct ctx *c, int fd) return 0; } + +/** + * set_migration_watch() - Add the migration file descriptor to epoll + * @c: Execution context + * @fd: File descriptor to add + * @target: Are we the target of the migration? + */ +static void set_migration_watch(const struct ctx *c, int fd, bool target) +{ + union epoll_ref ref = { + .type = EPOLL_TYPE_DEVICE_STATE, + .fd = fd, + }; + struct epoll_event ev = { 0 }; + + ev.data.u64 = ref.u64; + ev.events = target ? EPOLLIN : EPOLLOUT; + + epoll_ctl(c->epollfd, EPOLL_CTL_ADD, ref.fd, &ev); +} + +/** + * migrate_init() - Set up things necessary for migration + * @c: Execution context + */ +void migrate_init(struct ctx *c) +{ + c->device_state_fd = -1; + c->device_state_result = -1; +} + +/** + * migrate_close() - Close migration channel + * @c: Execution context + */ +void migrate_close(struct ctx *c) +{ + if (c->device_state_fd != -1) { + debug("Closing migration channel, fd: %d", c->device_state_fd); + epoll_del(c, c->device_state_fd); + close(c->device_state_fd); + c->device_state_fd = -1; + c->device_state_result = -1; + } +} + +/** + * migrate_request() - Request a migration of device state + * @c: Execution context + * @fd: fd to transfer state + * @target: Are we the target of the migration? + */ +void migrate_request(struct ctx *c, int fd, bool target) +{ + debug("Migration requested, fd: %d (was %d)", + fd, c->device_state_fd); + + if (c->device_state_fd != -1) + migrate_close(c); + + c->device_state_fd = fd; + set_migration_watch(c, c->device_state_fd, target); + +} + +/** + * migrate_handler() - Send/receive passt internal state to/from QEMU + * @c: Execution context + * @events: epoll events + */ +void migrate_handler(struct ctx *c, uint32_t events) +{ + int rc = EIO; + + debug("migrate_handler fd %d events %x", c->device_state_fd, events); + + if (events & EPOLLOUT) + rc = migrate_source(c, c->device_state_fd); + else if (events & EPOLLIN) + rc = migrate_target(c, c->device_state_fd); + + /* EPOLLHUP without EPOLLIN/EPOLLOUT, or EPOLLERR? Migration failed */ + + migrate_close(c); + + c->device_state_result = rc; +} diff --git a/migrate.h b/migrate.h index 793d5e5..ac15d0f 100644 --- a/migrate.h +++ b/migrate.h @@ -88,7 +88,9 @@ struct migrate_version { unsigned nstages; }; -int migrate_source(struct ctx *c, int fd); -int migrate_target(struct ctx *c, int fd); +void migrate_init(struct ctx *c); +void migrate_close(struct ctx *c); +void migrate_request(struct ctx *c, int fd, bool target); +void migrate_handler(struct ctx *c, uint32_t events); #endif /* MIGRATE_H */ diff --git a/passt.c b/passt.c index 184d4e5..afd63c2 100644 --- a/passt.c +++ b/passt.c @@ -75,7 +75,7 @@ char *epoll_type_str[] = { [EPOLL_TYPE_TAP_LISTEN] = "listening qemu socket", [EPOLL_TYPE_VHOST_CMD] = "vhost-user command socket", [EPOLL_TYPE_VHOST_KICK] = "vhost-user kick socket", - [EPOLL_TYPE_VHOST_MIGRATION] = "vhost-user migration socket", + [EPOLL_TYPE_DEVICE_STATE] = "migration device state channel", }; static_assert(ARRAY_SIZE(epoll_type_str) == EPOLL_NUM_TYPES, "epoll_type_str[] doesn't match enum epoll_type"); @@ -357,8 +357,8 @@ loop: case EPOLL_TYPE_VHOST_KICK: vu_kick_cb(c.vdev, ref, &now); break; - case EPOLL_TYPE_VHOST_MIGRATION: - vu_migrate(&c, eventmask); + case EPOLL_TYPE_DEVICE_STATE: + migrate_handler(&c, eventmask); break; default: /* Can't happen */ diff --git a/passt.h b/passt.h index 0dd4efa..0b8a26c 100644 --- a/passt.h +++ b/passt.h @@ -235,6 +235,8 @@ struct ip6_ctx { * @low_wmem: Low probed net.core.wmem_max * @low_rmem: Low probed net.core.rmem_max * @vdev: vhost-user device + * @device_state_fd: Device state migration channel + * @device_state_result: Device state migration result */ struct ctx { enum passt_modes mode; @@ -300,6 +302,10 @@ struct ctx { int low_rmem; struct vu_dev *vdev; + + /* Migration */ + int device_state_fd; + int device_state_result; }; void proto_update_l2_buf(const unsigned char *eth_d, diff --git a/vhost_user.c b/vhost_user.c index 9e38cfd..b107d0f 100644 --- a/vhost_user.c +++ b/vhost_user.c @@ -997,36 +997,6 @@ static bool vu_send_rarp_exec(struct vu_dev *vdev, return false; } -/** - * vu_set_migration_watch() - Add the migration file descriptor to epoll - * @vdev: vhost-user device - * @fd: File descriptor to add - * @direction: Direction of the migration (save or load backend state) - */ -static void vu_set_migration_watch(const struct vu_dev *vdev, int fd, - uint32_t direction) -{ - union epoll_ref ref = { - .type = EPOLL_TYPE_VHOST_MIGRATION, - .fd = fd, - }; - struct epoll_event ev = { 0 }; - - ev.data.u64 = ref.u64; - switch (direction) { - case VHOST_USER_TRANSFER_STATE_DIRECTION_SAVE: - ev.events = EPOLLOUT; - break; - case VHOST_USER_TRANSFER_STATE_DIRECTION_LOAD: - ev.events = EPOLLIN; - break; - default: - ASSERT(0); - } - - epoll_ctl(vdev->context->epollfd, EPOLL_CTL_ADD, ref.fd, &ev); -} - /** * vu_set_device_state_fd_exec() - Set the device state migration channel * @vdev: vhost-user device @@ -1051,16 +1021,8 @@ static bool vu_set_device_state_fd_exec(struct vu_dev *vdev, direction != VHOST_USER_TRANSFER_STATE_DIRECTION_LOAD) die("Invalide device_state_fd direction: %d", direction); - if (vdev->device_state_fd != -1) { - epoll_del(vdev->context, vdev->device_state_fd); - close(vdev->device_state_fd); - } - - vdev->device_state_fd = msg->fds[0]; - vdev->device_state_result = -1; - vu_set_migration_watch(vdev, vdev->device_state_fd, direction); - - debug("Got device_state_fd: %d", vdev->device_state_fd); + migrate_request(vdev->context, msg->fds[0], + direction == VHOST_USER_TRANSFER_STATE_DIRECTION_LOAD); /* We don't provide a new fd for the data transfer */ vmsg_set_reply_u64(msg, VHOST_USER_VRING_NOFD_MASK); @@ -1078,9 +1040,7 @@ static bool vu_set_device_state_fd_exec(struct vu_dev *vdev, static bool vu_check_device_state_exec(struct vu_dev *vdev, struct vhost_user_msg *msg) { - (void)vdev; - - vmsg_set_reply_u64(msg, vdev->device_state_result); + vmsg_set_reply_u64(msg, vdev->context->device_state_result); return true; } @@ -1106,8 +1066,8 @@ void vu_init(struct ctx *c) } c->vdev->log_table = NULL; c->vdev->log_call_fd = -1; - c->vdev->device_state_fd = -1; - c->vdev->device_state_result = -1; + + migrate_init(c); } @@ -1157,12 +1117,8 @@ void vu_cleanup(struct vu_dev *vdev) vu_close_log(vdev); - if (vdev->device_state_fd != -1) { - epoll_del(vdev->context, vdev->device_state_fd); - close(vdev->device_state_fd); - vdev->device_state_fd = -1; - vdev->device_state_result = -1; - } + /* If we lose the VU dev, we also lose our migration channel */ + migrate_close(vdev->context); } /** diff --git a/virtio.h b/virtio.h index 7bef2d2..0a59441 100644 --- a/virtio.h +++ b/virtio.h @@ -106,8 +106,6 @@ struct vu_dev_region { * @log_call_fd: Eventfd to report logging update * @log_size: Size of the logging memory region * @log_table: Base of the logging memory region - * @device_state_fd: Device state migration channel - * @device_state_result: Device state migration result */ struct vu_dev { struct ctx *context; @@ -119,8 +117,6 @@ struct vu_dev { int log_call_fd; uint64_t log_size; uint8_t *log_table; - int device_state_fd; - int device_state_result; }; /** diff --git a/vu_common.c b/vu_common.c index 3d41824..48826b1 100644 --- a/vu_common.c +++ b/vu_common.c @@ -305,30 +305,3 @@ err: return -1; } - -/** - * vu_migrate() - Send/receive passt internal state to/from QEMU - * @c: Execution context - * @events: epoll events - */ -void vu_migrate(struct ctx *c, uint32_t events) -{ - struct vu_dev *vdev = c->vdev; - int rc = EIO; - - debug("vu_migrate fd %d events %x", vdev->device_state_fd, events); - - if (events & EPOLLOUT) - rc = migrate_source(c, vdev->device_state_fd); - else if (events & EPOLLIN) - rc = migrate_target(c, vdev->device_state_fd); - - /* EPOLLHUP without EPOLLIN/EPOLLOUT, or EPOLLERR? Migration failed */ - - vdev->device_state_result = rc; - - epoll_ctl(c->epollfd, EPOLL_CTL_DEL, vdev->device_state_fd, NULL); - debug("Closing migration channel"); - close(vdev->device_state_fd); - vdev->device_state_fd = -1; -} diff --git a/vu_common.h b/vu_common.h index 69c4006..f538f23 100644 --- a/vu_common.h +++ b/vu_common.h @@ -57,5 +57,5 @@ void vu_flush(const struct vu_dev *vdev, struct vu_virtq *vq, void vu_kick_cb(struct vu_dev *vdev, union epoll_ref ref, const struct timespec *now); int vu_send_single(const struct ctx *c, const void *buf, size_t size); -void vu_migrate(struct ctx *c, uint32_t events); + #endif /* VU_COMMON_H */ -- 2.43.0
From: David Gibson <david(a)gibson.dropbear.id.au> Currently, once a migration device state fd is assigned, we wait for EPOLLIN or EPOLLOUT events on it to actually perform the migration. Change it so that once a migration is requested it we complete it synchronously at the end of the current epoll cycle. This has several advantages: 1. It makes it clear that everything about the migration must be dealt with at once, not split between multiple epoll events on the channel 2. It ensures the migration always takes place between epoll cycles, rather than, for example, between handling TCP events and their deferred handling in post_handler(). 3. It reduces code setting up the epoll watch on the fd. Signed-off-by: David Gibson <david(a)gibson.dropbear.id.au> --- epoll_type.h | 2 -- migrate.c | 43 +++++++++++-------------------------------- migrate.h | 2 +- passt.c | 8 ++++---- passt.h | 2 ++ 5 files changed, 18 insertions(+), 39 deletions(-) diff --git a/epoll_type.h b/epoll_type.h index b1d3a1d..f3ef415 100644 --- a/epoll_type.h +++ b/epoll_type.h @@ -40,8 +40,6 @@ enum epoll_type { EPOLL_TYPE_VHOST_CMD, /* vhost-user kick event socket */ EPOLL_TYPE_VHOST_KICK, - /* migration device state channel */ - EPOLL_TYPE_DEVICE_STATE, EPOLL_NUM_TYPES, }; diff --git a/migrate.c b/migrate.c index b7e5431..f64ffc8 100644 --- a/migrate.c +++ b/migrate.c @@ -250,26 +250,6 @@ static int migrate_target(struct ctx *c, int fd) return 0; } -/** - * set_migration_watch() - Add the migration file descriptor to epoll - * @c: Execution context - * @fd: File descriptor to add - * @target: Are we the target of the migration? - */ -static void set_migration_watch(const struct ctx *c, int fd, bool target) -{ - union epoll_ref ref = { - .type = EPOLL_TYPE_DEVICE_STATE, - .fd = fd, - }; - struct epoll_event ev = { 0 }; - - ev.data.u64 = ref.u64; - ev.events = target ? EPOLLIN : EPOLLOUT; - - epoll_ctl(c->epollfd, EPOLL_CTL_ADD, ref.fd, &ev); -} - /** * migrate_init() - Set up things necessary for migration * @c: Execution context @@ -288,7 +268,6 @@ void migrate_close(struct ctx *c) { if (c->device_state_fd != -1) { debug("Closing migration channel, fd: %d", c->device_state_fd); - epoll_del(c, c->device_state_fd); close(c->device_state_fd); c->device_state_fd = -1; c->device_state_result = -1; @@ -310,27 +289,27 @@ void migrate_request(struct ctx *c, int fd, bool target) migrate_close(c); c->device_state_fd = fd; - set_migration_watch(c, c->device_state_fd, target); - + c->migrate_target = target; } /** * migrate_handler() - Send/receive passt internal state to/from QEMU * @c: Execution context - * @events: epoll events */ -void migrate_handler(struct ctx *c, uint32_t events) +void migrate_handler(struct ctx *c) { - int rc = EIO; + int rc; - debug("migrate_handler fd %d events %x", c->device_state_fd, events); + if (c->device_state_fd < 0) + return; - if (events & EPOLLOUT) - rc = migrate_source(c, c->device_state_fd); - else if (events & EPOLLIN) - rc = migrate_target(c, c->device_state_fd); + debug("migrate_handler fd %d target %d", + c->device_state_fd, c->migrate_target); - /* EPOLLHUP without EPOLLIN/EPOLLOUT, or EPOLLERR? Migration failed */ + if (c->migrate_target) + rc = migrate_target(c, c->device_state_fd); + else + rc = migrate_source(c, c->device_state_fd); migrate_close(c); diff --git a/migrate.h b/migrate.h index ac15d0f..9c49e78 100644 --- a/migrate.h +++ b/migrate.h @@ -91,6 +91,6 @@ struct migrate_version { void migrate_init(struct ctx *c); void migrate_close(struct ctx *c); void migrate_request(struct ctx *c, int fd, bool target); -void migrate_handler(struct ctx *c, uint32_t events); +void migrate_handler(struct ctx *c); #endif /* MIGRATE_H */ diff --git a/passt.c b/passt.c index afd63c2..8682022 100644 --- a/passt.c +++ b/passt.c @@ -75,7 +75,8 @@ char *epoll_type_str[] = { [EPOLL_TYPE_TAP_LISTEN] = "listening qemu socket", [EPOLL_TYPE_VHOST_CMD] = "vhost-user command socket", [EPOLL_TYPE_VHOST_KICK] = "vhost-user kick socket", - [EPOLL_TYPE_DEVICE_STATE] = "migration device state channel", + [EPOLL_TYPE_REPAIR_LISTEN] = "TCP_REPAIR helper listening socket", + [EPOLL_TYPE_REPAIR] = "TCP_REPAIR helper socket", }; static_assert(ARRAY_SIZE(epoll_type_str) == EPOLL_NUM_TYPES, "epoll_type_str[] doesn't match enum epoll_type"); @@ -357,9 +358,6 @@ loop: case EPOLL_TYPE_VHOST_KICK: vu_kick_cb(c.vdev, ref, &now); break; - case EPOLL_TYPE_DEVICE_STATE: - migrate_handler(&c, eventmask); - break; default: /* Can't happen */ ASSERT(0); @@ -368,5 +366,7 @@ loop: post_handler(&c, &now); + migrate_handler(&c); + goto loop; } diff --git a/passt.h b/passt.h index 0b8a26c..2255f18 100644 --- a/passt.h +++ b/passt.h @@ -237,6 +237,7 @@ struct ip6_ctx { * @vdev: vhost-user device * @device_state_fd: Device state migration channel * @device_state_result: Device state migration result + * @migrate_target: Is this the target for next migration? */ struct ctx { enum passt_modes mode; @@ -306,6 +307,7 @@ struct ctx { /* Migration */ int device_state_fd; int device_state_result; + bool migrate_target; }; void proto_update_l2_buf(const unsigned char *eth_d, -- 2.43.0
In vhost-user mode, by default, create a second UNIX domain socket accepting connections from passt-repair, with the usual listener socket. When we need to set or clear TCP_REPAIR on sockets, we'll send them via SCM_RIGHTS to passt-repair, who sets the socket option values we ask for. To that end, introduce batched functions to request TCP_REPAIR settings on sockets, so that we don't have to send a single message for each socket, on migration. When needed, repair_flush() will send the message and check for the reply. Signed-off-by: Stefano Brivio <sbrivio(a)redhat.com> --- Makefile | 12 ++-- conf.c | 44 ++++++++++-- epoll_type.h | 4 ++ passt.1 | 11 +++ passt.c | 7 ++ passt.h | 7 ++ repair.c | 193 +++++++++++++++++++++++++++++++++++++++++++++++++++ repair.h | 16 +++++ tap.c | 65 +---------------- util.c | 62 +++++++++++++++++ util.h | 1 + 11 files changed, 350 insertions(+), 72 deletions(-) create mode 100644 repair.c create mode 100644 repair.h diff --git a/Makefile b/Makefile index f9e8a3c..fde6670 100644 --- a/Makefile +++ b/Makefile @@ -38,9 +38,9 @@ FLAGS += -DDUAL_STACK_SOCKETS=$(DUAL_STACK_SOCKETS) PASST_SRCS = arch.c arp.c checksum.c conf.c dhcp.c dhcpv6.c flow.c fwd.c \ icmp.c igmp.c inany.c iov.c ip.c isolation.c lineread.c log.c mld.c \ - ndp.c netlink.c migrate.c packet.c passt.c pasta.c pcap.c pif.c tap.c \ - tcp.c tcp_buf.c tcp_splice.c tcp_vu.c udp.c udp_flow.c udp_vu.c util.c \ - vhost_user.c virtio.c vu_common.c + ndp.c netlink.c migrate.c packet.c passt.c pasta.c pcap.c pif.c \ + repair.c tap.c tcp.c tcp_buf.c tcp_splice.c tcp_vu.c udp.c udp_flow.c \ + udp_vu.c util.c vhost_user.c virtio.c vu_common.c QRAP_SRCS = qrap.c PASST_REPAIR_SRCS = passt-repair.c SRCS = $(PASST_SRCS) $(QRAP_SRCS) $(PASST_REPAIR_SRCS) @@ -50,9 +50,9 @@ MANPAGES = passt.1 pasta.1 qrap.1 passt-repair.1 PASST_HEADERS = arch.h arp.h checksum.h conf.h dhcp.h dhcpv6.h flow.h fwd.h \ flow_table.h icmp.h icmp_flow.h inany.h iov.h ip.h isolation.h \ lineread.h log.h migrate.h ndp.h netlink.h packet.h passt.h pasta.h \ - pcap.h pif.h siphash.h tap.h tcp.h tcp_buf.h tcp_conn.h tcp_internal.h \ - tcp_splice.h tcp_vu.h udp.h udp_flow.h udp_internal.h udp_vu.h util.h \ - vhost_user.h virtio.h vu_common.h + pcap.h pif.h repair.h siphash.h tap.h tcp.h tcp_buf.h tcp_conn.h \ + tcp_internal.h tcp_splice.h tcp_vu.h udp.h udp_flow.h udp_internal.h \ + udp_vu.h util.h vhost_user.h virtio.h vu_common.h HEADERS = $(PASST_HEADERS) seccomp.h C := \#include <sys/random.h>\nint main(){int a=getrandom(0, 0, 0);} diff --git a/conf.c b/conf.c index df2b016..9ef79a4 100644 --- a/conf.c +++ b/conf.c @@ -816,6 +816,9 @@ static void usage(const char *name, FILE *f, int status) " UNIX domain socket is provided by -s option\n" " --print-capabilities print back-end capabilities in JSON format,\n" " only meaningful for vhost-user mode\n"); + FPRINTF(f, + " --repair-path PATH path for passt-repair(1)\n" + " default: append '.repair' to UNIX domain path\n"); } FPRINTF(f, @@ -1240,8 +1243,25 @@ static void conf_nat(const char *arg, struct in_addr *addr4, */ static void conf_open_files(struct ctx *c) { - if (c->mode != MODE_PASTA && c->fd_tap == -1) - c->fd_tap_listen = tap_sock_unix_open(c->sock_path); + if (c->mode != MODE_PASTA && c->fd_tap == -1) { + c->fd_tap_listen = sock_unix(c->sock_path); + + if (c->mode == MODE_VU && strcmp(c->repair_path, "none")) { + if (!*c->repair_path && + snprintf_check(c->repair_path, + sizeof(c->repair_path), "%s.repair", + c->sock_path)) { + warn("passt-repair path %s not usable", + c->repair_path); + c->fd_repair_listen = -1; + } else { + c->fd_repair_listen = sock_unix(c->repair_path); + } + } else { + c->fd_repair_listen = -1; + } + c->fd_repair = -1; + } if (*c->pidfile) { c->pidfile_fd = output_file_open(c->pidfile, O_WRONLY); @@ -1354,9 +1374,12 @@ void conf(struct ctx *c, int argc, char **argv) {"host-lo-to-ns-lo", no_argument, NULL, 23 }, {"dns-host", required_argument, NULL, 24 }, {"vhost-user", no_argument, NULL, 25 }, + /* vhost-user backend program convention */ {"print-capabilities", no_argument, NULL, 26 }, {"socket-path", required_argument, NULL, 's' }, + + {"repair-path", required_argument, NULL, 27 }, { 0 }, }; const char *logname = (c->mode == MODE_PASTA) ? "pasta" : "passt"; @@ -1748,6 +1771,9 @@ void conf(struct ctx *c, int argc, char **argv) case 'D': /* Handle these later, once addresses are configured */ break; + case 27: + /* Handle this once we checked --vhost-user */ + break; case 'h': usage(argv[0], stdout, EXIT_SUCCESS); break; @@ -1824,8 +1850,8 @@ void conf(struct ctx *c, int argc, char **argv) if (c->ifi4 && IN4_IS_ADDR_UNSPECIFIED(&c->ip4.guest_gw)) c->no_dhcp = 1; - /* Inbound port options & DNS can be parsed now (after IPv4/IPv6 - * settings) + /* Inbound port options, DNS, and --repair-path can be parsed now, after + * IPv4/IPv6 settings and --vhost-user. */ fwd_probe_ephemeral(); udp_portmap_clear(); @@ -1871,6 +1897,16 @@ void conf(struct ctx *c, int argc, char **argv) } die("Cannot use DNS address %s", optarg); + } else if (name == 27) { + if (c->mode != MODE_VU && strcmp(optarg, "none")) + die("--repair-path is for vhost-user mode only"); + + if (snprintf_check(c->repair_path, + sizeof(c->repair_path), "%s", + optarg)) + die("Invalid passt-repair path: %s", optarg); + + break; } } while (name != -1); diff --git a/epoll_type.h b/epoll_type.h index f3ef415..7f2a121 100644 --- a/epoll_type.h +++ b/epoll_type.h @@ -40,6 +40,10 @@ enum epoll_type { EPOLL_TYPE_VHOST_CMD, /* vhost-user kick event socket */ EPOLL_TYPE_VHOST_KICK, + /* TCP_REPAIR helper listening socket */ + EPOLL_TYPE_REPAIR_LISTEN, + /* TCP_REPAIR helper socket */ + EPOLL_TYPE_REPAIR, EPOLL_NUM_TYPES, }; diff --git a/passt.1 b/passt.1 index d9cd33e..63a3a01 100644 --- a/passt.1 +++ b/passt.1 @@ -418,6 +418,17 @@ Enable vhost-user. The vhost-user command socket is provided by \fB--socket\fR. .BR \-\-print-capabilities Print back-end capabilities in JSON format, only meaningful for vhost-user mode. +.TP +.BR \-\-repair-path " " \fIpath +Path for UNIX domain socket used by the \fBpasst-repair\fR(1) helper to connect +to \fBpasst\fR in order to set or clear the TCP_REPAIR option on sockets, during +migration. \fB--repair-path none\fR disables this interface (if you need to +specify a socket path called "none" you can prefix the path by \fI./\fR). + +Default, for \-\-vhost-user mode only, is to append \fI.repair\fR to the path +chosen for the hypervisor UNIX domain socket. No socket is created if not in +\-\-vhost-user mode. + .TP .BR \-F ", " \-\-fd " " \fIFD Pass a pre-opened, connected socket to \fBpasst\fR. Usually the socket is opened diff --git a/passt.c b/passt.c index 8682022..1938290 100644 --- a/passt.c +++ b/passt.c @@ -51,6 +51,7 @@ #include "tcp_splice.h" #include "ndp.h" #include "vu_common.h" +#include "repair.h" #define EPOLL_EVENTS 8 @@ -358,6 +359,12 @@ loop: case EPOLL_TYPE_VHOST_KICK: vu_kick_cb(c.vdev, ref, &now); break; + case EPOLL_TYPE_REPAIR_LISTEN: + repair_listen_handler(&c, eventmask); + break; + case EPOLL_TYPE_REPAIR: + repair_handler(&c, eventmask); + break; default: /* Can't happen */ ASSERT(0); diff --git a/passt.h b/passt.h index 2255f18..4189a4a 100644 --- a/passt.h +++ b/passt.h @@ -20,6 +20,7 @@ union epoll_ref; #include "siphash.h" #include "ip.h" #include "inany.h" +#include "migrate.h" #include "flow.h" #include "icmp.h" #include "fwd.h" @@ -193,6 +194,7 @@ struct ip6_ctx { * @foreground: Run in foreground, don't log to stderr by default * @nofile: Maximum number of open files (ulimit -n) * @sock_path: Path for UNIX domain socket + * @repair_path: TCP_REPAIR helper path, can be "none", empty for default * @pcap: Path for packet capture file * @pidfile: Path to PID file, empty string if not configured * @pidfile_fd: File descriptor for PID file, -1 if none @@ -203,6 +205,8 @@ struct ip6_ctx { * @epollfd: File descriptor for epoll instance * @fd_tap_listen: File descriptor for listening AF_UNIX socket, if any * @fd_tap: AF_UNIX socket, tuntap device, or pre-opened socket + * @fd_repair_listen: File descriptor for listening TCP_REPAIR socket, if any + * @fd_repair: Connected AF_UNIX socket for TCP_REPAIR helper * @our_tap_mac: Pasta/passt's MAC on the tap link * @guest_mac: MAC address of guest or namespace, seen or configured * @hash_secret: 128-bit secret for siphash functions @@ -247,6 +251,7 @@ struct ctx { int foreground; int nofile; char sock_path[UNIX_PATH_MAX]; + char repair_path[UNIX_PATH_MAX]; char pcap[PATH_MAX]; char pidfile[PATH_MAX]; @@ -263,6 +268,8 @@ struct ctx { int epollfd; int fd_tap_listen; int fd_tap; + int fd_repair_listen; + int fd_repair; unsigned char our_tap_mac[ETH_ALEN]; unsigned char guest_mac[ETH_ALEN]; uint64_t hash_secret[2]; diff --git a/repair.c b/repair.c new file mode 100644 index 0000000..6151927 --- /dev/null +++ b/repair.c @@ -0,0 +1,193 @@ +// SPDX-License-Identifier: GPL-2.0-or-later + +/* PASST - Plug A Simple Socket Transport + * for qemu/UNIX domain socket mode + * + * PASTA - Pack A Subtle Tap Abstraction + * for network namespace/tap device mode + * + * repair.c - Interface (server) for passt-repair, set/clear TCP_REPAIR + * + * Copyright (c) 2025 Red Hat GmbH + * Author: Stefano Brivio <sbrivio(a)redhat.com> + */ + +#include <errno.h> +#include <sys/uio.h> + +#include "util.h" +#include "ip.h" +#include "passt.h" +#include "inany.h" +#include "flow.h" +#include "flow_table.h" + +#include "repair.h" + +#define SCM_MAX_FD 253 /* From Linux kernel (include/net/scm.h), not in UAPI */ + +static int repair_fds[SCM_MAX_FD]; +static int repair_cmd; +static int repair_nfds; + +/** + * repair_sock_init() - Start listening for connections on helper socket + * @c: Execution context + */ +void repair_sock_init(const struct ctx *c) +{ + union epoll_ref ref = { .type = EPOLL_TYPE_REPAIR_LISTEN }; + struct epoll_event ev = { 0 }; + + listen(c->fd_repair_listen, 0); + + ref.fd = c->fd_repair_listen; + ev.events = EPOLLIN | EPOLLHUP | EPOLLET; + ev.data.u64 = ref.u64; + epoll_ctl(c->epollfd, EPOLL_CTL_ADD, c->fd_repair_listen, &ev); +} + +/** + * repair_listen_handler() - Handle events on TCP_REPAIR helper listening socket + * @c: Execution context + * @events: epoll events + */ +void repair_listen_handler(struct ctx *c, uint32_t events) +{ + union epoll_ref ref = { .type = EPOLL_TYPE_REPAIR }; + struct epoll_event ev = { 0 }; + struct ucred ucred; + socklen_t len; + + if (events != EPOLLIN) { + debug("Spurious event 0x%04x on TCP_REPAIR helper socket", + events); + return; + } + + len = sizeof(ucred); + + /* Another client is already connected: accept and close right away. */ + if (c->fd_repair != -1) { + int discard = accept4(c->fd_repair_listen, NULL, NULL, + SOCK_NONBLOCK); + + if (discard == -1) + return; + + if (!getsockopt(discard, SOL_SOCKET, SO_PEERCRED, &ucred, &len)) + info("Discarding TCP_REPAIR helper, PID %i", ucred.pid); + + close(discard); + return; + } + + c->fd_repair = accept4(c->fd_repair_listen, NULL, NULL, 0); + + if (!getsockopt(c->fd_repair, SOL_SOCKET, SO_PEERCRED, &ucred, &len)) + info("Accepted TCP_REPAIR helper, PID %i", ucred.pid); + + ref.fd = c->fd_repair; + ev.events = EPOLLHUP | EPOLLET; + ev.data.u64 = ref.u64; + epoll_ctl(c->epollfd, EPOLL_CTL_ADD, c->fd_repair, &ev); +} + +/** + * repair_close() - Close connection to TCP_REPAIR helper + * @c: Execution context + */ +void repair_close(struct ctx *c) +{ + debug("Closing TCP_REPAIR helper socket"); + + epoll_ctl(c->epollfd, EPOLL_CTL_DEL, c->fd_repair, NULL); + close(c->fd_repair); + c->fd_repair = -1; +} + +/** + * repair_handler() - Handle EPOLLHUP and EPOLLERR on TCP_REPAIR helper socket + * @c: Execution context + * @events: epoll events + */ +void repair_handler(struct ctx *c, uint32_t events) +{ + (void)events; + + repair_close(c); +} + +/** + * repair_flush() - Flush current set of sockets to helper, with current command + * @c: Execution context + * + * Return: 0 on success, negative error code on failure + */ +int repair_flush(struct ctx *c) +{ + struct iovec iov = { &((int8_t){ repair_cmd }), sizeof(int8_t) }; + char buf[CMSG_SPACE(sizeof(int) * SCM_MAX_FD)] + __attribute__ ((aligned(__alignof__(struct cmsghdr)))); + struct cmsghdr *cmsg; + struct msghdr msg; + + if (!repair_nfds) + return 0; + + msg = (struct msghdr){ NULL, 0, &iov, 1, + buf, CMSG_SPACE(sizeof(int) * repair_nfds), 0 }; + cmsg = CMSG_FIRSTHDR(&msg); + + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = SCM_RIGHTS; + cmsg->cmsg_len = CMSG_LEN(sizeof(int) * repair_nfds); + memcpy(CMSG_DATA(cmsg), repair_fds, sizeof(int) * repair_nfds); + + repair_nfds = 0; + + if (sendmsg(c->fd_repair, &msg, 0) < 0) { + int ret = -errno; + err_perror("Failed to send sockets to TCP_REPAIR helper"); + repair_close(c); + return ret; + } + + if (recv(c->fd_repair, &((int8_t){ 0 }), 1, 0) < 0) { + int ret = -errno; + err_perror("Failed to receive reply from TCP_REPAIR helper"); + repair_close(c); + return ret; + } + + return 0; +} + +/** + * repair_flush() - Add socket to TCP_REPAIR set with given command + * @c: Execution context + * @s: Socket to add + * @cmd: TCP_REPAIR_ON, TCP_REPAIR_OFF, or TCP_REPAIR_OFF_NO_WP + * + * Return: 0 on success, negative error code on failure + */ +/* cppcheck-suppress unusedFunction */ +int repair_set(struct ctx *c, int s, int cmd) +{ + int rc; + + if (repair_nfds && repair_cmd != cmd) { + if ((rc = repair_flush(c))) + return rc; + } + + repair_cmd = cmd; + repair_fds[repair_nfds++] = s; + + if (repair_nfds >= SCM_MAX_FD) { + if ((rc = repair_flush(c))) + return rc; + } + + return 0; +} diff --git a/repair.h b/repair.h new file mode 100644 index 0000000..693c515 --- /dev/null +++ b/repair.h @@ -0,0 +1,16 @@ +/* SPDX-License-Identifier: GPL-2.0-or-later + * Copyright (c) 2025 Red Hat GmbH + * Author: Stefano Brivio <sbrivio(a)redhat.com> + */ + +#ifndef REPAIR_H +#define REPAIR_H + +void repair_sock_init(const struct ctx *c); +void repair_listen_handler(struct ctx *c, uint32_t events); +void repair_handler(struct ctx *c, uint32_t events); +void repair_close(struct ctx *c); +int repair_flush(struct ctx *c); +int repair_set(struct ctx *c, int s, int cmd); + +#endif /* REPAIR_H */ diff --git a/tap.c b/tap.c index 772648f..3659aab 100644 --- a/tap.c +++ b/tap.c @@ -56,6 +56,7 @@ #include "netlink.h" #include "pasta.h" #include "packet.h" +#include "repair.h" #include "tap.h" #include "log.h" #include "vhost_user.h" @@ -1151,68 +1152,6 @@ void tap_handler_pasta(struct ctx *c, uint32_t events, tap_pasta_input(c, now); } -/** - * tap_sock_unix_open() - Create and bind AF_UNIX socket - * @sock_path: Socket path. If empty, set on return (UNIX_SOCK_PATH as prefix) - * - * Return: socket descriptor on success, won't return on failure - */ -int tap_sock_unix_open(char *sock_path) -{ - int fd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0); - struct sockaddr_un addr = { - .sun_family = AF_UNIX, - }; - int i; - - if (fd < 0) - die_perror("Failed to open UNIX domain socket"); - - for (i = 1; i < UNIX_SOCK_MAX; i++) { - char *path = addr.sun_path; - int ex, ret; - - if (*sock_path) - memcpy(path, sock_path, UNIX_PATH_MAX); - else if (snprintf_check(path, UNIX_PATH_MAX - 1, - UNIX_SOCK_PATH, i)) - die_perror("Can't build UNIX domain socket path"); - - ex = socket(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, - 0); - if (ex < 0) - die_perror("Failed to check for UNIX domain conflicts"); - - ret = connect(ex, (const struct sockaddr *)&addr, sizeof(addr)); - if (!ret || (errno != ENOENT && errno != ECONNREFUSED && - errno != EACCES)) { - if (*sock_path) - die("Socket path %s already in use", path); - - close(ex); - continue; - } - close(ex); - - unlink(path); - ret = bind(fd, (const struct sockaddr *)&addr, sizeof(addr)); - if (*sock_path && ret) - die_perror("Failed to bind UNIX domain socket"); - - if (!ret) - break; - } - - if (i == UNIX_SOCK_MAX) - die_perror("Failed to bind UNIX domain socket"); - - info("UNIX domain socket bound at %s", addr.sun_path); - if (!*sock_path) - memcpy(sock_path, addr.sun_path, UNIX_PATH_MAX); - - return fd; -} - /** * tap_backend_show_hints() - Give help information to start QEMU * @c: Execution context @@ -1423,6 +1362,8 @@ void tap_backend_init(struct ctx *c) tap_sock_tun_init(c); break; case MODE_VU: + repair_sock_init(c); + /* fall through */ case MODE_PASST: tap_sock_unix_init(c); diff --git a/util.c b/util.c index 800c6b5..77c8612 100644 --- a/util.c +++ b/util.c @@ -178,6 +178,68 @@ int sock_l4_sa(const struct ctx *c, enum epoll_type type, return fd; } +/** + * sock_unix() - Create and bind AF_UNIX socket + * @sock_path: Socket path. If empty, set on return (UNIX_SOCK_PATH as prefix) + * + * Return: socket descriptor on success, won't return on failure + */ +int sock_unix(char *sock_path) +{ + int fd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0); + struct sockaddr_un addr = { + .sun_family = AF_UNIX, + }; + int i; + + if (fd < 0) + die_perror("Failed to open UNIX domain socket"); + + for (i = 1; i < UNIX_SOCK_MAX; i++) { + char *path = addr.sun_path; + int ex, ret; + + if (*sock_path) + memcpy(path, sock_path, UNIX_PATH_MAX); + else if (snprintf_check(path, UNIX_PATH_MAX - 1, + UNIX_SOCK_PATH, i)) + die_perror("Can't build UNIX domain socket path"); + + ex = socket(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, + 0); + if (ex < 0) + die_perror("Failed to check for UNIX domain conflicts"); + + ret = connect(ex, (const struct sockaddr *)&addr, sizeof(addr)); + if (!ret || (errno != ENOENT && errno != ECONNREFUSED && + errno != EACCES)) { + if (*sock_path) + die("Socket path %s already in use", path); + + close(ex); + continue; + } + close(ex); + + unlink(path); + ret = bind(fd, (const struct sockaddr *)&addr, sizeof(addr)); + if (*sock_path && ret) + die_perror("Failed to bind UNIX domain socket"); + + if (!ret) + break; + } + + if (i == UNIX_SOCK_MAX) + die_perror("Failed to bind UNIX domain socket"); + + info("UNIX domain socket bound at %s", addr.sun_path); + if (!*sock_path) + memcpy(sock_path, addr.sun_path, UNIX_PATH_MAX); + + return fd; +} + /** * sock_probe_mem() - Check if setting high SO_SNDBUF and SO_RCVBUF is allowed * @c: Execution context diff --git a/util.h b/util.h index 23b165c..dfac63b 100644 --- a/util.h +++ b/util.h @@ -185,6 +185,7 @@ struct ctx; int sock_l4_sa(const struct ctx *c, enum epoll_type type, const void *sa, socklen_t sl, const char *ifname, bool v6only, uint32_t data); +int sock_unix(char *sock_path); void sock_probe_mem(struct ctx *c); long timespec_diff_ms(const struct timespec *a, const struct timespec *b); int64_t timespec_diff_us(const struct timespec *a, const struct timespec *b); -- 2.43.0
Very much draft quality, but it works. Ask passt-repair to switch TCP sockets to repair mode and dump their current sequence numbers to the flow table, which will be transferred and used by the target in the next step. Signed-off-by: Stefano Brivio <sbrivio(a)redhat.com> --- flow.c | 49 +++++++++++++++++++++++++++++++++++++++++++++ flow.h | 2 ++ migrate.c | 15 +++++++------- tcp.c | 56 ++++++++++++++++++++++++++++++++++++++++++++++++++++ tcp_conn.h | 5 +++++ vhost_user.c | 1 + 6 files changed, 121 insertions(+), 7 deletions(-) diff --git a/flow.c b/flow.c index 3b8dd0e..da738eb 100644 --- a/flow.c +++ b/flow.c @@ -19,6 +19,7 @@ #include "inany.h" #include "flow.h" #include "flow_table.h" +#include "repair.h" const char *flow_state_str[] = { [FLOW_STATE_FREE] = "FREE", @@ -870,6 +871,54 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now) *last_next = FLOW_MAX; } +/** + * flow_migrate_source_pre() - Prepare all source flows for migration + * @c: Execution context + * @m: Migration metadata + * @stage: Migration stage information (unused) + * @fd: Migration fd (unused) + * + * Return: 0 on success + */ +int flow_migrate_source_pre(struct ctx *c, struct migrate_meta *m, + const struct migrate_stage *stage, int fd) +{ + unsigned i; + int rc; + + (void)m; + (void)stage; + (void)fd; + + for (i = 0; i < FLOW_MAX; i++) { /* TODO: iterator with skip */ + union flow *flow = &flowtab[i]; + + if (flow->f.state == FLOW_STATE_FREE) + i += flow->free.n - 1; + else if (flow->f.state == FLOW_STATE_ACTIVE && + flow->f.type == FLOW_TCP) + rc = tcp_flow_repair_on(c, &flow->tcp); + + if (rc) + return rc; /* TODO: rollback */ + } + + if ((rc = repair_flush(c))) /* TODO: move to TCP logic */ + return rc; + + for (i = 0; i < FLOW_MAX; i++) { /* TODO: iterator with skip */ + union flow *flow = &flowtab[i]; + + if (flow->f.state == FLOW_STATE_FREE) + i += flow->free.n - 1; + else if (flow->f.state == FLOW_STATE_ACTIVE && + flow->f.type == FLOW_TCP) + tcp_flow_dump_seq(c, &flow->tcp); + } + + return 0; +} + /** * flow_init() - Initialise flow related data structures */ diff --git a/flow.h b/flow.h index 24ba3ef..c526938 100644 --- a/flow.h +++ b/flow.h @@ -249,6 +249,8 @@ union flow; void flow_init(void); void flow_defer_handler(const struct ctx *c, const struct timespec *now); +int flow_migrate_source_pre(struct ctx *c, struct migrate_meta *m, + const struct migrate_stage *stage, int fd); void flow_log_(const struct flow_common *f, int pri, const char *fmt, ...) __attribute__((format(printf, 3, 4))); diff --git a/migrate.c b/migrate.c index f64ffc8..b42b12f 100644 --- a/migrate.c +++ b/migrate.c @@ -21,9 +21,9 @@ #include "inany.h" #include "flow.h" #include "flow_table.h" -#include "repair.h" #include "migrate.h" +#include "repair.h" /* Current version of migration data */ #define MIGRATE_VERSION 1 @@ -104,6 +104,7 @@ static int migrate_recv_block(struct ctx *c, struct migrate_meta *m, static const struct migrate_stage stages_v1[] = { { .name = "flow pre", + .source = flow_migrate_source_pre, .target = NULL, }, DATA_STAGE(flow_first_free), @@ -129,7 +130,7 @@ static const struct migrate_version versions[] = { * * Return: 0 on success, positive error code on failure */ -static int migrate_source(struct ctx *c, int fd) +int migrate_source(struct ctx *c, int fd) { struct migrate_meta m; unsigned i; @@ -220,7 +221,7 @@ static int migrate_target_read_header(int fd, struct migrate_meta *m) * * Return: 0 on success, positive error code on failure */ -static int migrate_target(struct ctx *c, int fd) +int migrate_target(struct ctx *c, int fd) { struct migrate_meta m; unsigned i; @@ -258,6 +259,7 @@ void migrate_init(struct ctx *c) { c->device_state_fd = -1; c->device_state_result = -1; + repair_sock_init(c); } /** @@ -268,6 +270,7 @@ void migrate_close(struct ctx *c) { if (c->device_state_fd != -1) { debug("Closing migration channel, fd: %d", c->device_state_fd); + epoll_del(c, c->device_state_fd); close(c->device_state_fd); c->device_state_fd = -1; c->device_state_result = -1; @@ -282,14 +285,12 @@ void migrate_close(struct ctx *c) */ void migrate_request(struct ctx *c, int fd, bool target) { - debug("Migration requested, fd: %d (was %d)", - fd, c->device_state_fd); + debug("Migration requested, fd: %d", c->device_state_fd); if (c->device_state_fd != -1) migrate_close(c); c->device_state_fd = fd; - c->migrate_target = target; } /** @@ -304,7 +305,7 @@ void migrate_handler(struct ctx *c) return; debug("migrate_handler fd %d target %d", - c->device_state_fd, c->migrate_target); + c->device_state_fd, c->migrate_target); if (c->migrate_target) rc = migrate_target(c, c->device_state_fd); diff --git a/tcp.c b/tcp.c index fac322c..96d7649 100644 --- a/tcp.c +++ b/tcp.c @@ -299,6 +299,7 @@ #include "log.h" #include "inany.h" #include "flow.h" +#include "repair.h" #include "linux_dep.h" #include "flow_table.h" @@ -868,6 +869,61 @@ void tcp_defer_handler(struct ctx *c) tcp_payload_flush(c); } +/** + * tcp_flow_repair_on() - Enable repair mode for a single TCP flow + * @c: Execution context + * @conn: Pointer to the TCP connection structure + * + * Return: 0 on success, negative error code on failure + */ +int tcp_flow_repair_on(struct ctx *c, const struct tcp_tap_conn *conn) +{ + int rc = 0; + + if ((rc = repair_set(c, conn->sock, TCP_REPAIR_ON))) + err("Failed to set TCP_REPAIR"); + + return rc; +} + +/** + * tcp_flow_dump_seq() - Dump sequences for send and receive queues + * @c: Execution context + * @conn: Pointer to the TCP connection structure + * + * Return: 0 on success, negative error code on failure + */ +int tcp_flow_dump_seq(struct ctx *c, struct tcp_tap_conn *conn) +{ + int v, s = conn->sock; + socklen_t vlen; + + (void)c; + + vlen = sizeof(v); + + v = TCP_SEND_QUEUE; + /* TODO: proper error management and prints */ + if (setsockopt(s, SOL_TCP, TCP_REPAIR_QUEUE, &v, vlen)) + return -errno; + + if (getsockopt(s, SOL_TCP, TCP_QUEUE_SEQ, &conn->sock_seq_snd, &vlen)) + return -errno; + + debug("Send queue sequence %u for socket %i", conn->sock_seq_snd, s); + + v = TCP_RECV_QUEUE; + if (setsockopt(s, SOL_TCP, TCP_REPAIR_QUEUE, &v, vlen)) + return -errno; + + if (getsockopt(s, SOL_TCP, TCP_QUEUE_SEQ, &conn->sock_seq_rcv, &vlen)) + return -errno; + + debug("Receive queue sequence %u for socket %i", conn->sock_seq_rcv, s); + + return 0; +} + /** * tcp_fill_header() - Fill the TCP header fields for a given TCP segment. * diff --git a/tcp_conn.h b/tcp_conn.h index d342680..0c3e197 100644 --- a/tcp_conn.h +++ b/tcp_conn.h @@ -94,6 +94,9 @@ struct tcp_tap_conn { uint32_t seq_from_tap; uint32_t seq_ack_to_tap; uint32_t seq_init_from_tap; + + uint32_t sock_seq_snd; + uint32_t sock_seq_rcv; }; /** @@ -140,6 +143,8 @@ extern int init_sock_pool4 [TCP_SOCK_POOL_SIZE]; extern int init_sock_pool6 [TCP_SOCK_POOL_SIZE]; bool tcp_flow_defer(const struct tcp_tap_conn *conn); +int tcp_flow_repair_on(struct ctx *c, const struct tcp_tap_conn *conn); +int tcp_flow_dump_seq(struct ctx *c, struct tcp_tap_conn *conn); bool tcp_splice_flow_defer(struct tcp_splice_conn *conn); void tcp_splice_timer(const struct ctx *c, struct tcp_splice_conn *conn); int tcp_conn_pool_sock(int pool[]); diff --git a/vhost_user.c b/vhost_user.c index b107d0f..63f745c 100644 --- a/vhost_user.c +++ b/vhost_user.c @@ -44,6 +44,7 @@ #include "tap.h" #include "vhost_user.h" #include "pcap.h" +#include "migrate.h" /* vhost-user version we are compatible with */ #define VHOST_USER_VERSION 1 -- 2.43.0
On Tue, Feb 04, 2025 at 01:47:43AM +0100, Stefano Brivio wrote:Very much draft quality, but it works. Ask passt-repair to switch TCP sockets to repair mode and dump their current sequence numbers to the flow table, which will be transferred and used by the target in the next step.[snip]@@ -268,6 +270,7 @@ void migrate_close(struct ctx *c) { if (c->device_state_fd != -1) { debug("Closing migration channel, fd: %d", c->device_state_fd); + epoll_del(c, c->device_state_fd);You have a stray revert hunks here again which breaks things horribly. And therefore makes me not confident that I'm actually testing the same code you are.close(c->device_state_fd); c->device_state_fd = -1; c->device_state_result = -1; @@ -282,14 +285,12 @@ void migrate_close(struct ctx *c) */ void migrate_request(struct ctx *c, int fd, bool target) { - debug("Migration requested, fd: %d (was %d)", - fd, c->device_state_fd); + debug("Migration requested, fd: %d", c->device_state_fd); if (c->device_state_fd != -1) migrate_close(c); c->device_state_fd = fd; - c->migrate_target = target;And here. -- David Gibson (he or they) | I'll have my music baroque, and my code david AT gibson.dropbear.id.au | minimalist, thank you, not the other way | around. http://www.ozlabs.org/~dgibson
On Tue, 4 Feb 2025 14:43:07 +1100 David Gibson <david(a)gibson.dropbear.id.au> wrote:On Tue, Feb 04, 2025 at 01:47:43AM +0100, Stefano Brivio wrote:Well, folding "[PATCH 6/6] migrate: Make migration handlers simpler and more flexible" into the first patches, where it belongs, was a lot of fun. As I mentioned, I didn't test this one, I just build-tested the final result.Very much draft quality, but it works. Ask passt-repair to switch TCP sockets to repair mode and dump their current sequence numbers to the flow table, which will be transferred and used by the target in the next step.[snip]@@ -268,6 +270,7 @@ void migrate_close(struct ctx *c) { if (c->device_state_fd != -1) { debug("Closing migration channel, fd: %d", c->device_state_fd); + epoll_del(c, c->device_state_fd);You have a stray revert hunks here again which breaks things horribly. And therefore makes me not confident that I'm actually testing the same code you are.Can you please post the fixed rebase...? -- Stefanoclose(c->device_state_fd); c->device_state_fd = -1; c->device_state_result = -1; @@ -282,14 +285,12 @@ void migrate_close(struct ctx *c) */ void migrate_request(struct ctx *c, int fd, bool target) { - debug("Migration requested, fd: %d (was %d)", - fd, c->device_state_fd); + debug("Migration requested, fd: %d", c->device_state_fd); if (c->device_state_fd != -1) migrate_close(c); c->device_state_fd = fd; - c->migrate_target = target;And here.
On Tue, Feb 04, 2025 at 07:44:10AM +0100, Stefano Brivio wrote:On Tue, 4 Feb 2025 14:43:07 +1100 David Gibson <david(a)gibson.dropbear.id.au> wrote:Heh, fair point.On Tue, Feb 04, 2025 at 01:47:43AM +0100, Stefano Brivio wrote:Well, folding "[PATCH 6/6] migrate: Make migration handlers simpler and more flexible" into the first patches, where it belongs, was a lot of fun.Very much draft quality, but it works. Ask passt-repair to switch TCP sockets to repair mode and dump their current sequence numbers to the flow table, which will be transferred and used by the target in the next step.[snip]@@ -268,6 +270,7 @@ void migrate_close(struct ctx *c) { if (c->device_state_fd != -1) { debug("Closing migration channel, fd: %d", c->device_state_fd); + epoll_del(c, c->device_state_fd);You have a stray revert hunks here again which breaks things horribly. And therefore makes me not confident that I'm actually testing the same code you are.As I mentioned, I didn't test this one, I just build-tested the final result.Ah, yeah, I've been thinking this way of posting patches back and forth hasn't been working great for this rapid iteration. On the other hand I was relucant to do the patch folding myself for fear of clobbering changes you were making concurrently to the same patches. I'll try posting the whole rebase and see how we go. -- David Gibson (he or they) | I'll have my music baroque, and my code david AT gibson.dropbear.id.au | minimalist, thank you, not the other way | around. http://www.ozlabs.org/~dgibsonCan you please post the fixed rebase...?close(c->device_state_fd); c->device_state_fd = -1; c->device_state_result = -1; @@ -282,14 +285,12 @@ void migrate_close(struct ctx *c) */ void migrate_request(struct ctx *c, int fd, bool target) { - debug("Migration requested, fd: %d (was %d)", - fd, c->device_state_fd); + debug("Migration requested, fd: %d", c->device_state_fd); if (c->device_state_fd != -1) migrate_close(c); c->device_state_fd = fd; - c->migrate_target = target;And here.
On migration, the source process asks passt-helper to set TCP sockets in repair mode, dumps the information we need to migrate connections, and closes them. At this point, we can't pass them back to passt-helper using SCM_RIGHTS, because they are closed, from that perspective, and sendmsg() will give us EBADF. But if we don't clear repair mode, the port they are bound to will not be available for binding in the target. Terminate once we're done with the migration and we reported the state. This is equivalent to clearing repair mode on the sockets we just closed. Signed-off-by: Stefano Brivio <sbrivio(a)redhat.com> --- vhost_user.c | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/vhost_user.c b/vhost_user.c index 63f745c..8077f1d 100644 --- a/vhost_user.c +++ b/vhost_user.c @@ -998,6 +998,8 @@ static bool vu_send_rarp_exec(struct vu_dev *vdev, return false; } +static bool quit_on_device_state = false; + /** * vu_set_device_state_fd_exec() - Set the device state migration channel * @vdev: vhost-user device @@ -1025,6 +1027,9 @@ static bool vu_set_device_state_fd_exec(struct vu_dev *vdev, migrate_request(vdev->context, msg->fds[0], direction == VHOST_USER_TRANSFER_STATE_DIRECTION_LOAD); + if (direction == VHOST_USER_TRANSFER_STATE_DIRECTION_SAVE) + quit_on_device_state = true; + /* We don't provide a new fd for the data transfer */ vmsg_set_reply_u64(msg, VHOST_USER_VRING_NOFD_MASK); @@ -1202,4 +1207,10 @@ void vu_control_handler(struct vu_dev *vdev, int fd, uint32_t events) if (reply_requested) vu_send_reply(fd, &msg); + + if (quit_on_device_state && + msg.hdr.request == VHOST_USER_CHECK_DEVICE_STATE) { + info("Migration complete, exiting"); + exit(EXIT_SUCCESS); + } } -- 2.43.0
It's draft quality, with a number of hacks, and it will need a partial rewrite. Add: - flow_migrate_target_post(), to open target-side sockets and bind them, switch them to repair mode, connect them, and make them leave repair mode again - copies of flow table, 'flow_first_free' pointer, related hash table, and hash secret. The copy of the hash secret shows that the current declarative approach to data sections has some drawbacks Change tcp_flow_dump_seq() into tcp_flow_repair_seq(), which can dump as well as restore sequences (used before connecting sockets). Once we connect sockets, before we take them out of repair mode, we need to restore MSS and window scaling information (what would be determined by TCP options on handshake). I'm using hardcoded values as we don't have a way to transfer these bits of socket-side information. Before we turn repair mode off, add sockets to the epoll list and set up per-socket timerfd descriptors, with initial timer scheduling. Signed-off-by: Stefano Brivio <sbrivio(a)redhat.com> --- flow.c | 51 ++++++++++++++++++++- flow.h | 2 + migrate.c | 1 + passt.c | 4 ++ passt.h | 2 + tcp.c | 132 +++++++++++++++++++++++++++++++++++++++++++++++++---- tcp_conn.h | 4 +- 7 files changed, 185 insertions(+), 11 deletions(-) diff --git a/flow.c b/flow.c index da738eb..aaf200f 100644 --- a/flow.c +++ b/flow.c @@ -913,12 +913,61 @@ int flow_migrate_source_pre(struct ctx *c, struct migrate_meta *m, i += flow->free.n - 1; else if (flow->f.state == FLOW_STATE_ACTIVE && flow->f.type == FLOW_TCP) - tcp_flow_dump_seq(c, &flow->tcp); + tcp_flow_repair_seq(c, &flow->tcp, false); } return 0; } +/** + * flow_migrate_target_post() - Restore all flows after migration + * @c: Execution context + * @m: Migration metadata + * @stage: Migration stage information (unused) + * @fd: Migration fd (unused) + * + * Return: 0 on success + */ +int flow_migrate_target_post(struct ctx *c, struct migrate_meta *m, + const struct migrate_stage *stage, int fd) +{ + unsigned i; + int rc; + + (void)m; + (void)stage; + (void)fd; + + for (i = 0; i < FLOW_MAX; i++) { /* TODO: iterator with skip */ + union flow *flow = &flowtab[i]; + + if (flow->f.state == FLOW_STATE_FREE) + i += flow->free.n - 1; + else if (flow->f.state == FLOW_STATE_ACTIVE && + flow->f.type == FLOW_TCP) + rc = tcp_flow_repair_socket(c, &flow->tcp); + + if (rc) + return rc; /* TODO: rollback */ + } + + repair_flush(c); /* TODO: move to TCP logic */ + + for (i = 0; i < FLOW_MAX; i++) { /* TODO: iterator with skip */ + union flow *flow = &flowtab[i]; + + if (flow->f.state == FLOW_STATE_FREE) + i += flow->free.n - 1; + else if (flow->f.state == FLOW_STATE_ACTIVE && + flow->f.type == FLOW_TCP) + tcp_flow_repair_connect(c, &flow->tcp); + } + + repair_flush(c); /* TODO: move to TCP logic */ + + return 0; +} + /** * flow_init() - Initialise flow related data structures */ diff --git a/flow.h b/flow.h index c526938..fec124f 100644 --- a/flow.h +++ b/flow.h @@ -251,6 +251,8 @@ void flow_init(void); void flow_defer_handler(const struct ctx *c, const struct timespec *now); int flow_migrate_source_pre(struct ctx *c, struct migrate_meta *m, const struct migrate_stage *stage, int fd); +int flow_migrate_target_post(struct ctx *c, struct migrate_meta *m, + const struct migrate_stage *stage, int fd); void flow_log_(const struct flow_common *f, int pri, const char *fmt, ...) __attribute__((format(printf, 3, 4))); diff --git a/migrate.c b/migrate.c index b42b12f..2af7f64 100644 --- a/migrate.c +++ b/migrate.c @@ -113,6 +113,7 @@ static const struct migrate_stage stages_v1[] = { { .name = "flow post", .source = NULL, + .target = flow_migrate_target_post, }, }; diff --git a/passt.c b/passt.c index 1938290..65e9126 100644 --- a/passt.c +++ b/passt.c @@ -119,6 +119,8 @@ static void post_handler(struct ctx *c, const struct timespec *now) ndp_timer(c, now); } +uint64_t g_hash_secret[2]; + /** * random_init() - Initialise things based on random data * @c: Execution context @@ -130,6 +132,8 @@ static void random_init(struct ctx *c) /* Create secret value for SipHash calculations */ raw_random(&c->hash_secret, sizeof(c->hash_secret)); + memcpy(g_hash_secret, c->hash_secret, sizeof(g_hash_secret)); + /* Seed pseudo-RNG for things that need non-cryptographic random */ raw_random(&seed, sizeof(seed)); srandom(seed); diff --git a/passt.h b/passt.h index 4189a4a..6010f92 100644 --- a/passt.h +++ b/passt.h @@ -317,6 +317,8 @@ struct ctx { bool migrate_target; }; +extern uint64_t g_hash_secret[2]; + void proto_update_l2_buf(const unsigned char *eth_d, const unsigned char *eth_s); diff --git a/tcp.c b/tcp.c index 96d7649..3760e67 100644 --- a/tcp.c +++ b/tcp.c @@ -887,13 +887,31 @@ int tcp_flow_repair_on(struct ctx *c, const struct tcp_tap_conn *conn) } /** - * tcp_flow_dump_seq() - Dump sequences for send and receive queues + * tcp_flow_repair_off() - Clear repair mode for a single TCP flow * @c: Execution context * @conn: Pointer to the TCP connection structure * * Return: 0 on success, negative error code on failure */ -int tcp_flow_dump_seq(struct ctx *c, struct tcp_tap_conn *conn) +static int tcp_flow_repair_off(struct ctx *c, const struct tcp_tap_conn *conn) +{ + int rc = 0; + + if ((rc = repair_set(c, conn->sock, TCP_REPAIR_OFF))) + err("Failed to clear TCP_REPAIR"); + + return rc; +} + +/** + * tcp_flow_repair_seq() - Dump or set sequences for socket queues + * @c: Execution context + * @conn: Pointer to the TCP connection structure + * @set: Set if true, dump if false + * + * Return: 0 on success, negative error code on failure + */ +int tcp_flow_repair_seq(struct ctx *c, struct tcp_tap_conn *conn, bool set) { int v, s = conn->sock; socklen_t vlen; @@ -902,28 +920,124 @@ int tcp_flow_dump_seq(struct ctx *c, struct tcp_tap_conn *conn) vlen = sizeof(v); - v = TCP_SEND_QUEUE; /* TODO: proper error management and prints */ - if (setsockopt(s, SOL_TCP, TCP_REPAIR_QUEUE, &v, vlen)) - return -errno; - if (getsockopt(s, SOL_TCP, TCP_QUEUE_SEQ, &conn->sock_seq_snd, &vlen)) + v = TCP_SEND_QUEUE; + if (setsockopt(s, SOL_TCP, TCP_REPAIR_QUEUE, &v, vlen)) return -errno; - debug("Send queue sequence %u for socket %i", conn->sock_seq_snd, s); + if (set) { + if (setsockopt(s, SOL_TCP, TCP_QUEUE_SEQ, &conn->sock_seq_snd, + vlen)) + return -errno; + debug("Set send queue sequence for socket %i to %u", + s, conn->sock_seq_snd); + } else { + if (getsockopt(s, SOL_TCP, TCP_QUEUE_SEQ, &conn->sock_seq_snd, + &vlen)) + return -errno; + debug("Dumped send queue sequence for socket %i: %u", + s, conn->sock_seq_snd); + } v = TCP_RECV_QUEUE; if (setsockopt(s, SOL_TCP, TCP_REPAIR_QUEUE, &v, vlen)) return -errno; - if (getsockopt(s, SOL_TCP, TCP_QUEUE_SEQ, &conn->sock_seq_rcv, &vlen)) + if (set) { + if (setsockopt(s, SOL_TCP, TCP_QUEUE_SEQ, &conn->sock_seq_rcv, + vlen)) + return -errno; + debug("Set receive queue sequence for socket %i to %u", + s, conn->sock_seq_rcv); + } else { + if (getsockopt(s, SOL_TCP, TCP_QUEUE_SEQ, &conn->sock_seq_rcv, + &vlen)) + return -errno; + debug("Dumped receive queue sequence for socket %i: %u", + s, conn->sock_seq_rcv); + } + + return 0; +} + +/** + * tcp_flow_repair_socket() - Open and bind socket, request repair mode + * @c: Execution context + * @conn: Pointer to the TCP connection structure + * + * Return: 0 on success, negative error code on failure + */ +int tcp_flow_repair_socket(struct ctx *c, struct tcp_tap_conn *conn) +{ + sa_family_t af = CONN_V4(conn) ? AF_INET : AF_INET6; + const struct flowside *sockside = HOSTFLOW(conn); + struct sockaddr_in a; + int rc; + + a = (struct sockaddr_in){ af, htons(sockside->oport), { 0 }, { 0 } }; + + if ((conn->sock = socket(af, SOCK_STREAM, IPPROTO_TCP)) < 0) + return -errno; + + /* On the same host, source socket can be in TIME_WAIT */ + setsockopt(conn->sock, SOL_SOCKET, SO_REUSEADDR, + &((int){ 1 }), sizeof(int)); + + if (bind(conn->sock, (struct sockaddr *)&a, sizeof(a)) < 0) { + close(conn->sock); + conn->sock = -1; return -errno; + } - debug("Receive queue sequence %u for socket %i", conn->sock_seq_rcv, s); + rc = tcp_flow_repair_on(c, conn); + if (rc) { + close(conn->sock); + conn->sock = -1; + return rc; + } return 0; } +/** + * tcp_flow_repair_connect() - Connect sockets in repair mode, then turn it off + * @c: Execution context + * @conn: Pointer to the TCP connection structure + * + * Return: 0 on success, negative error code on failure + */ +int tcp_flow_repair_connect(struct ctx *c, struct tcp_tap_conn *conn) +{ + struct flowside *tgt = &conn->f.side[TGTSIDE]; + struct tcp_repair_opt opts[2]; + + tcp_flow_repair_seq(c, conn, true); + + flowside_connect(c, conn->sock, PIF_HOST, tgt); + + /* FIXME: Fetch those with TCP_REPAIR_OPTIONS and store in migration + * data. These hardcoded values just happen to be good enough. + * + * On top of these, to seamlessly restore the window, we also need to + * dump and restore struct tcp_repair_window via TCP_REPAIR_WINDOW. + */ + opts[0].opt_code = TCPOPT_WINDOW; + opts[0].opt_val = 8 + (8 << 16); + + opts[1].opt_code = TCPOPT_MAXSEG; + opts[1].opt_val = 65495; + + setsockopt(conn->sock, SOL_TCP, TCP_REPAIR_OPTIONS, + opts, 2 * sizeof(struct tcp_repair_opt)); + + conn->in_epoll = 0; + conn->timer = -1; + tcp_epoll_ctl(c, conn); + + return tcp_flow_repair_off(c, conn); +} + /** * tcp_fill_header() - Fill the TCP header fields for a given TCP segment. * diff --git a/tcp_conn.h b/tcp_conn.h index 0c3e197..3bf8837 100644 --- a/tcp_conn.h +++ b/tcp_conn.h @@ -144,7 +144,9 @@ extern int init_sock_pool6 [TCP_SOCK_POOL_SIZE]; bool tcp_flow_defer(const struct tcp_tap_conn *conn); int tcp_flow_repair_on(struct ctx *c, const struct tcp_tap_conn *conn); -int tcp_flow_dump_seq(struct ctx *c, struct tcp_tap_conn *conn); +int tcp_flow_repair_seq(struct ctx *c, struct tcp_tap_conn *conn, bool set); +int tcp_flow_repair_socket(struct ctx *c, struct tcp_tap_conn *conn); +int tcp_flow_repair_connect(struct ctx *c, struct tcp_tap_conn *conn); bool tcp_splice_flow_defer(struct tcp_splice_conn *conn); void tcp_splice_timer(const struct ctx *c, struct tcp_splice_conn *conn); int tcp_conn_pool_sock(int pool[]); -- 2.43.0
On Tue, Feb 04, 2025 at 01:47:37AM +0100, Stefano Brivio wrote:I applied what I could, and squashed a number patches, including those from "[PATCH 0/6] More migration improvements". I didn't test the full flow here.With this patch set, I'm still unable to migrate a connection. First there are some simple things: * There are, again, some bogus revert hunks in the series which I had to remove. See comments * There are some bugs in the already committed patches, I posted a couple of fixes for those Then debugging the actual migration problem. The first issue I encountered was that the connect() on the target was failing with EADDRNOTAVAIL. I think this is a timing issue - the exit of the source instance isn't happening quite in time to free up the port. I was able to work around that by closing the source side socket immediately after extracting its info with tcp_repair, instead of postponing that to the exit check_device_state. To address this properly, I think we need to do a couple of things: * Treat the source-side point of no return as being at the end of migrate_source(). Close our sockets and purge flows at that point. * On the target side, defer actually re-opening/repairing the sockets until check_state, or even send_rarp time. Laurent, if you have some insight (or can find out) what guarantees we have in terms of what points on the target come after what points on the source that would be useful to check. With the hacky workaround, the connect() now success, but I still couldn't send data from the target. From strace, the target side passt doesn't seem to be attempting doing any writes to the socket, but I'm not sure why yet. pcap on the target side does show some packets on the stream coming from the guest, although it's only one byte (0x0a) and I thought I'd typed 5ish. It then appears to retransmit several times without getting an ack from passt. I'll keep debugging tomorrow if you don't have an insight. -- David Gibson (he or they) | I'll have my music baroque, and my code david AT gibson.dropbear.id.au | minimalist, thank you, not the other way | around. http://www.ozlabs.org/~dgibson
On Tue, 4 Feb 2025 17:01:04 +1100 David Gibson <david(a)gibson.dropbear.id.au> wrote:With the hacky workaround, the connect() now success, but I still couldn't send data from the target. From strace, the target side passt doesn't seem to be attempting doing any writes to the socket, but I'm not sure why yet. pcap on the target side does show some packets on the stream coming from the guest, although it's only one byte (0x0a) and I thought I'd typed 5ish. It then appears to retransmit several times without getting an ack from passt.It looks like an issue with the window, I had something similar before it occurred to me that I had to set the window scaling. Perhaps my hardcoded value isn't good enough, after all. Can you share the pcap? -- Stefano