The flow table free list, hash table, and flow_new_entry are global
shared state accessed from multiple threads.
Protect flow_alloc(), flow_alloc_cancel(), flow_hash_insert(),
flow_hash_remove(), and the free list rebuild in flow_defer_handler()
with a pthread_rwlock_t: writers for mutations, readers for lookups.
Make flow_new_entry _Thread_local so each thread independently tracks
its own in-progress allocation.
Since the lock is released between flow_alloc() and flow_activate(),
other threads can observe intermediate flow states (NEW, INI, TGT,
TYPED) during traversal. Adapt flow_foreach() and flow_defer_handler()
accordingly: skip these entries silently rather than treating them as
errors, and break free-list cluster merging across them.
Filter flow_defer_handler()'s first loop by qpair, so each thread
only processes its own flows.
Signed-off-by: Laurent Vivier
---
flow.c | 59 +++++++++++++++++++++++++++++++++++++++++++---------
flow_table.h | 2 +-
2 files changed, 50 insertions(+), 11 deletions(-)
diff --git a/flow.c b/flow.c
index 08c7620c7b0f..149360c3ec87 100644
--- a/flow.c
+++ b/flow.c
@@ -12,6 +12,8 @@
#include
#include
+#include
+
#include "util.h"
#include "ip.h"
#include "passt.h"
@@ -129,7 +131,7 @@ static_assert(ARRAY_SIZE(flow_epoll) == FLOW_NUM_TYPES,
unsigned flow_first_free;
union flow flowtab[FLOW_MAX];
-static const union flow *flow_new_entry; /* = NULL */
+static _Thread_local const union flow *flow_new_entry; /* = NULL */
int qpair_to_fd[FLOW_QPAIR_SIZE];
/* Hash table to index it */
@@ -142,6 +144,8 @@ static flow_sidx_t flow_hashtab[FLOW_HASH_SIZE];
static_assert(ARRAY_SIZE(flow_hashtab) >= 2 * FLOW_MAX,
"Safe linear probing requires hash table with more entries than the number of sides in the flow table");
+static pthread_rwlock_t flow_lock = PTHREAD_RWLOCK_INITIALIZER;
+
/** flowside_from_af() - Initialise flowside from addresses
* @side: flowside to initialise
* @af: Address family (AF_INET or AF_INET6)
@@ -616,12 +620,18 @@ void flow_activate(struct flow_common *f)
*/
union flow *flow_alloc(void)
{
- union flow *flow = &flowtab[flow_first_free];
+ union flow *flow;
+
+ pthread_rwlock_wrlock(&flow_lock);
+
+ flow = &flowtab[flow_first_free];
assert(!flow_new_entry);
- if (flow_first_free >= FLOW_MAX)
+ if (flow_first_free >= FLOW_MAX) {
+ pthread_rwlock_unlock(&flow_lock);
return NULL;
+ }
assert(flow->f.state == FLOW_STATE_FREE);
assert(flow->f.type == FLOW_TYPE_NONE);
@@ -650,6 +660,8 @@ union flow *flow_alloc(void)
memset(flow, 0, sizeof(*flow));
flow_set_state(&flow->f, FLOW_STATE_NEW);
+ pthread_rwlock_unlock(&flow_lock);
+
return flow;
}
@@ -661,6 +673,8 @@ union flow *flow_alloc(void)
*/
void flow_alloc_cancel(union flow *flow)
{
+ pthread_rwlock_wrlock(&flow_lock);
+
assert(flow_new_entry == flow);
assert(flow->f.state == FLOW_STATE_NEW ||
flow->f.state == FLOW_STATE_INI ||
@@ -678,6 +692,8 @@ void flow_alloc_cancel(union flow *flow)
flow->free.next = flow_first_free;
flow_first_free = FLOW_IDX(flow);
flow_new_entry = NULL;
+
+ pthread_rwlock_unlock(&flow_lock);
}
/**
@@ -763,9 +779,13 @@ static inline unsigned flow_hash_probe(const struct ctx *c, flow_sidx_t sidx)
uint64_t flow_hash_insert(const struct ctx *c, flow_sidx_t sidx)
{
uint64_t hash = flow_sidx_hash(c, sidx);
- unsigned b = flow_hash_probe_(hash, sidx);
+ unsigned b;
+ pthread_rwlock_wrlock(&flow_lock);
+ b = flow_hash_probe_(hash, sidx);
flow_hashtab[b] = sidx;
+ pthread_rwlock_unlock(&flow_lock);
+
flow_dbg(flow_at_sidx(sidx), "Side %u hash table insert: bucket: %u",
sidx.sidei, b);
@@ -779,10 +799,16 @@ uint64_t flow_hash_insert(const struct ctx *c, flow_sidx_t sidx)
*/
void flow_hash_remove(const struct ctx *c, flow_sidx_t sidx)
{
- unsigned b = flow_hash_probe(c, sidx), s;
+ unsigned b, s;
- if (!flow_sidx_valid(flow_hashtab[b]))
+ pthread_rwlock_wrlock(&flow_lock);
+
+ b = flow_hash_probe(c, sidx);
+
+ if (!flow_sidx_valid(flow_hashtab[b])) {
+ pthread_rwlock_unlock(&flow_lock);
return; /* Redundant remove */
+ }
flow_dbg(flow_at_sidx(sidx), "Side %u hash table remove: bucket: %u",
sidx.sidei, b);
@@ -802,6 +828,8 @@ void flow_hash_remove(const struct ctx *c, flow_sidx_t sidx)
}
flow_hashtab[b] = FLOW_SIDX_NONE;
+
+ pthread_rwlock_unlock(&flow_lock);
}
/**
@@ -816,10 +844,12 @@ void flow_hash_remove(const struct ctx *c, flow_sidx_t sidx)
static flow_sidx_t flowside_lookup(const struct ctx *c, uint8_t proto,
uint8_t pif, const struct flowside *side)
{
- flow_sidx_t sidx;
+ flow_sidx_t sidx, ret;
union flow *flow;
unsigned b;
+ pthread_rwlock_rdlock(&flow_lock);
+
b = flow_hash(c, proto, pif, side) % FLOW_HASH_SIZE;
while ((sidx = flow_hashtab[b], flow = flow_at_sidx(sidx)) &&
!(FLOW_PROTO(&flow->f) == proto &&
@@ -827,7 +857,11 @@ static flow_sidx_t flowside_lookup(const struct ctx *c, uint8_t proto,
flowside_eq(&flow->f.side[sidx.sidei], side)))
b = mod_sub(b, 1, FLOW_HASH_SIZE);
- return flow_hashtab[b];
+ ret = flow_hashtab[b];
+
+ pthread_rwlock_unlock(&flow_lock);
+
+ return ret;
}
/**
@@ -920,6 +954,9 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now,
flow_foreach(flow) {
bool closed = false;
+ if (flow->f.qpair != qpair)
+ continue;
+
switch (flow->f.type) {
case FLOW_TYPE_NONE:
assert(false);
@@ -951,6 +988,7 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now,
}
/* Second step: actually free the flows */
+ pthread_rwlock_wrlock(&flow_lock);
flow_foreach_slot(flow) {
switch (flow->f.state) {
case FLOW_STATE_FREE: {
@@ -979,8 +1017,8 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now,
case FLOW_STATE_INI:
case FLOW_STATE_TGT:
case FLOW_STATE_TYPED:
- /* Incomplete flow at end of cycle */
- assert(false);
+ /* In-progress allocation on another thread */
+ free_head = NULL;
break;
case FLOW_STATE_ACTIVE:
@@ -1012,6 +1050,7 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now,
}
*last_next = FLOW_MAX;
+ pthread_rwlock_unlock(&flow_lock);
}
/**
diff --git a/flow_table.h b/flow_table.h
index e4ff6f73c35c..f2545390205a 100644
--- a/flow_table.h
+++ b/flow_table.h
@@ -72,7 +72,7 @@ extern union flow flowtab[];
(flow) += (flow)->free.n - 1; \
/* NOLINTNEXTLINE(readability-inconsistent-ifelse-braces) */\
else if ((flow)->f.state != FLOW_STATE_ACTIVE) { \
- flow_err((flow), "Bad flow state during traversal"); \
+ (void)0; /* Differs from bare continue */ \
continue; \
} else
--
2.54.0