Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support a more safe read function and variadic functions of DoublyBufferedData #2898

Merged
merged 4 commits into from
Mar 21, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Support variadic functions of DoublyBufferedData
chenBright committed Feb 24, 2025
commit 4517d04df542490e28a04e3878cd97e96f58069b
123 changes: 16 additions & 107 deletions src/butil/containers/doubly_buffered_data.h
Original file line number Diff line number Diff line change
@@ -123,7 +123,7 @@ class DoublyBufferedData {
// Returns 0 on success, -1 otherwise.
int Read(ScopedPtr* ptr);

// fn is `int Fn(const T& foreground)', will be called with foreground instance.
// `fn(const T&)' will be called with foreground instance.
// This function is not blocked by Read() and Modify() in other threads.
// Returns 0 on success, otherwise on error.
template<typename Fn>
@@ -134,78 +134,15 @@ class DoublyBufferedData {
// NOTE: Call same series of fn to different equivalent instances should
// result in equivalent instances, otherwise foreground and background
// instance will be inconsistent.
template <typename Fn> size_t Modify(Fn& fn);
template <typename Fn, typename Arg1> size_t Modify(Fn& fn, const Arg1&);
template <typename Fn, typename Arg1, typename Arg2>
size_t Modify(Fn& fn, const Arg1&, const Arg2&);
template <typename Fn, typename... Args>
size_t Modify(Fn&& fn, Args&&... args);

// fn(T& background, const T& foreground, ...) will be called to background
// and foreground instances respectively.
template <typename Fn> size_t ModifyWithForeground(Fn& fn);
template <typename Fn, typename Arg1>
size_t ModifyWithForeground(Fn& fn, const Arg1&);
template <typename Fn, typename Arg1, typename Arg2>
size_t ModifyWithForeground(Fn& fn, const Arg1&, const Arg2&);
template <typename Fn, typename... Args>
size_t ModifyWithForeground(Fn&& fn, Args&&... args);

private:
template <typename Fn>
struct WithFG0 {
WithFG0(Fn& fn, T* data) : _fn(fn), _data(data) { }
size_t operator()(T& bg) {
return _fn(bg, (const T&)_data[&bg == _data]);
}
private:
Fn& _fn;
T* _data;
};

template <typename Fn, typename Arg1>
struct WithFG1 {
WithFG1(Fn& fn, T* data, const Arg1& arg1)
: _fn(fn), _data(data), _arg1(arg1) {}
size_t operator()(T& bg) {
return _fn(bg, (const T&)_data[&bg == _data], _arg1);
}
private:
Fn& _fn;
T* _data;
const Arg1& _arg1;
};

template <typename Fn, typename Arg1, typename Arg2>
struct WithFG2 {
WithFG2(Fn& fn, T* data, const Arg1& arg1, const Arg2& arg2)
: _fn(fn), _data(data), _arg1(arg1), _arg2(arg2) {}
size_t operator()(T& bg) {
return _fn(bg, (const T&)_data[&bg == _data], _arg1, _arg2);
}
private:
Fn& _fn;
T* _data;
const Arg1& _arg1;
const Arg2& _arg2;
};

template <typename Fn, typename Arg1>
struct Closure1 {
Closure1(Fn& fn, const Arg1& arg1) : _fn(fn), _arg1(arg1) {}
size_t operator()(T& bg) { return _fn(bg, _arg1); }
private:
Fn& _fn;
const Arg1& _arg1;
};

template <typename Fn, typename Arg1, typename Arg2>
struct Closure2 {
Closure2(Fn& fn, const Arg1& arg1, const Arg2& arg2)
: _fn(fn), _arg1(arg1), _arg2(arg2) {}
size_t operator()(T& bg) { return _fn(bg, _arg1, _arg2); }
private:
Fn& _fn;
const Arg1& _arg1;
const Arg2& _arg2;
};

const T* UnsafeRead() const {
return _data + _index.load(butil::memory_order_acquire);
}
@@ -259,7 +196,8 @@ template <typename T, typename TLS, bool AllowBthreadSuspended>
class DoublyBufferedData<T, TLS, AllowBthreadSuspended>::WrapperTLSGroup {
public:
const static size_t RAW_BLOCK_SIZE = 4096;
const static size_t ELEMENTS_PER_BLOCK = RAW_BLOCK_SIZE / sizeof(Wrapper) > 0 ? RAW_BLOCK_SIZE / sizeof(Wrapper) : 1;
const static size_t ELEMENTS_PER_BLOCK =
RAW_BLOCK_SIZE / sizeof(Wrapper) > 0 ? RAW_BLOCK_SIZE / sizeof(Wrapper) : 1;

struct BAIDU_CACHELINE_ALIGNMENT ThreadBlock {
inline DoublyBufferedData::Wrapper* at(size_t offset) {
@@ -590,8 +528,8 @@ int DoublyBufferedData<T, TLS, AllowBthreadSuspended>::Read(Fn&& fn) {
}

template <typename T, typename TLS, bool AllowBthreadSuspended>
template <typename Fn>
size_t DoublyBufferedData<T, TLS, AllowBthreadSuspended>::Modify(Fn& fn) {
template <typename Fn, typename... Args>
size_t DoublyBufferedData<T, TLS, AllowBthreadSuspended>::Modify(Fn&& fn, Args&&... args) {
// _modify_mutex sequences modifications. Using a separate mutex rather
// than _wrappers_mutex is to avoid blocking threads calling
// AddWrapper() or RemoveWrapper() too long. Most of the time, modifications
@@ -600,7 +538,7 @@ size_t DoublyBufferedData<T, TLS, AllowBthreadSuspended>::Modify(Fn& fn) {
int bg_index = !_index.load(butil::memory_order_relaxed);
// background instance is not accessed by other threads, being safe to
// modify.
const size_t ret = fn(_data[bg_index]);
const size_t ret = fn(_data[bg_index], std::forward<Args>(args)...);
if (!ret) {
return 0;
}
@@ -626,46 +564,17 @@ size_t DoublyBufferedData<T, TLS, AllowBthreadSuspended>::Modify(Fn& fn) {
}
}

const size_t ret2 = fn(_data[bg_index]);
const size_t ret2 = fn(_data[bg_index], std::forward<Args>(args)...);
CHECK_EQ(ret2, ret) << "index=" << _index.load(butil::memory_order_relaxed);
return ret2;
}

template <typename T, typename TLS, bool AllowBthreadSuspended>
template <typename Fn, typename Arg1>
size_t DoublyBufferedData<T, TLS, AllowBthreadSuspended>::Modify(Fn& fn, const Arg1& arg1) {
Closure1<Fn, Arg1> c(fn, arg1);
return Modify(c);
}

template <typename T, typename TLS, bool AllowBthreadSuspended>
template <typename Fn, typename Arg1, typename Arg2>
size_t DoublyBufferedData<T, TLS, AllowBthreadSuspended>::Modify(
Fn& fn, const Arg1& arg1, const Arg2& arg2) {
Closure2<Fn, Arg1, Arg2> c(fn, arg1, arg2);
return Modify(c);
}

template <typename T, typename TLS, bool AllowBthreadSuspended>
template <typename Fn>
size_t DoublyBufferedData<T, TLS, AllowBthreadSuspended>::ModifyWithForeground(Fn& fn) {
WithFG0<Fn> c(fn, _data);
return Modify(c);
}

template <typename T, typename TLS, bool AllowBthreadSuspended>
template <typename Fn, typename Arg1>
size_t DoublyBufferedData<T, TLS, AllowBthreadSuspended>::ModifyWithForeground(Fn& fn, const Arg1& arg1) {
WithFG1<Fn, Arg1> c(fn, _data, arg1);
return Modify(c);
}

template <typename T, typename TLS, bool AllowBthreadSuspended>
template <typename Fn, typename Arg1, typename Arg2>
size_t DoublyBufferedData<T, TLS, AllowBthreadSuspended>::ModifyWithForeground(
Fn& fn, const Arg1& arg1, const Arg2& arg2) {
WithFG2<Fn, Arg1, Arg2> c(fn, _data, arg1, arg2);
return Modify(c);
template <typename Fn, typename... Args>
size_t DoublyBufferedData<T, TLS, AllowBthreadSuspended>::ModifyWithForeground(Fn&& fn, Args&&... args) {
return Modify([this, fn](T& bg, Args&&... args) {
return fn(bg, (const T&)_data[&bg == _data], std::forward<Args>(args)...);
}, std::forward<Args>(args)...);
}

} // namespace butil
6 changes: 5 additions & 1 deletion test/brpc_load_balancer_unittest.cpp
Original file line number Diff line number Diff line change
@@ -123,10 +123,14 @@ void test_doubly_buffered_data() {
}

d.Modify(AddN, 10);
d.Modify([](Foo& f, int n) -> size_t {
f.x += n;
return 1;
}, 10);
{
typename DBD::ScopedPtr ptr;
ASSERT_EQ(0, d.Read(&ptr));
ASSERT_EQ(10, ptr->x);
ASSERT_EQ(20, ptr->x);
}
}