Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added queue.getFps() function and tests for this function #1229

Open
wants to merge 2 commits into
base: v3_develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bindings/python/src/MessageQueueBindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ void MessageQueueBindings::bind(pybind11::module& m, void* pCallstack) {
.def("getMaxSize", &MessageQueue::getMaxSize, DOC(dai, MessageQueue, getMaxSize))
.def("getSize", &MessageQueue::getSize, DOC(dai, MessageQueue, getSize))
.def("isFull", &MessageQueue::isFull, DOC(dai, MessageQueue, isFull))
.def("getFps", &MessageQueue::getFps, DOC(dai, MessageQueue, getFps))
.def("addCallback", addCallbackLambda, py::arg("callback"), DOC(dai, MessageQueue, addCallback))
.def("removeCallback", &MessageQueue::removeCallback, py::arg("callbackId"), DOC(dai, MessageQueue, removeCallback))
.def("has", static_cast<bool (MessageQueue::*)()>(&MessageQueue::has), DOC(dai, MessageQueue, has))
Expand Down
12 changes: 12 additions & 0 deletions include/depthai/pipeline/MessageQueue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
// std
#include <memory>
#include <vector>
#include <deque>
#include <chrono>

// project
#include "depthai/pipeline/datatype/ADatatype.hpp"
Expand All @@ -26,10 +28,13 @@ class MessageQueue : public std::enable_shared_from_this<MessageQueue> {

private:
static constexpr auto CLOSED_QUEUE_MESSAGE = "MessageQueue was closed";
static constexpr size_t FPS_QUEUE_MAX_SIZE = 10;
LockingQueue<std::shared_ptr<ADatatype>> queue;
std::string name;
std::mutex callbacksMtx;
std::mutex fpsMtx;
std::unordered_map<CallbackId, std::function<void(std::string, std::shared_ptr<ADatatype>)>> callbacks;
std::deque<std::chrono::steady_clock::time_point> fpsQueue;
CallbackId uniqueCallbackId{0};
void callCallbacks(std::shared_ptr<ADatatype> msg);

Expand Down Expand Up @@ -128,6 +133,13 @@ class MessageQueue : public std::enable_shared_from_this<MessageQueue> {
*/
unsigned int isFull() const;

/**
* Gets current FPS of the queue
*
* @returns Current FPS
*/
float getFps();

/**
* Adds a callback on message received
*
Expand Down
50 changes: 50 additions & 0 deletions src/pipeline/MessageQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

// std
#include <chrono>
#include <deque>
#include <iostream>

// project
Expand Down Expand Up @@ -70,6 +71,37 @@ unsigned int MessageQueue::isFull() const {
return queue.isFull();
}

float MessageQueue::getFps() {
std::unique_lock<std::mutex> lock(fpsMtx);

// Get current time
auto now = std::chrono::steady_clock::now();
auto threshold = now - std::chrono::seconds(2);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason behind the treshold? I would set a max queue size instead so it's more consistent for low and high FPS

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@moratom well if you have script node and queue suddenly stops receiving messages, it would still report last FPS. Instead this logic removes messages older than 2sec so you get real FPS.


// Remove timestamps older than 2 seconds
while (!fpsQueue.empty() && fpsQueue.front() < threshold) {
fpsQueue.pop_front();
}

// If fewer than 2 timestamps are in queue, not enough data to compute FPS
if(fpsQueue.size() < 2) {
return 0.0;
}

auto oldest = fpsQueue.front();
auto newest = fpsQueue.back();
auto diff = std::chrono::duration<float>(newest - oldest).count(); // seconds

// If diff is extremely small, avoid dividing by zero
if(diff <= 0.0) {
return 0.0;
}
// Using (N - 1) frames over 'diff' seconds
// or (N) messages over 'diff' seconds—both approaches are common.
// This calculates how many frames we got over that time window.
return (fpsQueue.size() - 1) / diff;
}

int MessageQueue::addCallback(std::function<void(std::string, std::shared_ptr<ADatatype>)> callback) {
// Lock first
std::unique_lock<std::mutex> lock(callbacksMtx);
Expand Down Expand Up @@ -111,6 +143,16 @@ void MessageQueue::send(const std::shared_ptr<ADatatype>& msg) {
callCallbacks(msg);
auto queueNotClosed = queue.push(msg);
if(!queueNotClosed) throw QueueException(CLOSED_QUEUE_MESSAGE);

// Record the timestamp for FPS calculation
{
std::unique_lock<std::mutex> lock(fpsMtx);
auto now = std::chrono::steady_clock::now();
fpsQueue.push_back(now);
if(fpsQueue.size() > FPS_QUEUE_MAX_SIZE) {
fpsQueue.pop_front();
}
}
}

bool MessageQueue::send(const std::shared_ptr<ADatatype>& msg, std::chrono::milliseconds timeout) {
Expand All @@ -119,6 +161,14 @@ bool MessageQueue::send(const std::shared_ptr<ADatatype>& msg, std::chrono::mill
if(queue.isDestroyed()) {
throw QueueException(CLOSED_QUEUE_MESSAGE);
}
{
std::unique_lock<std::mutex> lock(fpsMtx);
auto now = std::chrono::steady_clock::now();
fpsQueue.push_back(now);
if(fpsQueue.size() > FPS_QUEUE_MAX_SIZE) {
fpsQueue.pop_front();
}
}
return queue.tryWaitAndPush(msg, timeout);
}

Expand Down
36 changes: 36 additions & 0 deletions tests/src/onhost_tests/message_queue_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -395,3 +395,39 @@ TEST_CASE("Multi callbacks", "[MessageQueue]") {
REQUIRE(callbackCount1 == 1);
REQUIRE(callbackCount2 == 1);
}

TEST_CASE("MessageQueue - FPS Calculation", "[MessageQueue]") {
// Create a non-blocking queue to avoid blocking when the underlying queue is full
MessageQueue queue(10, false);

// Ensure FPS starts at 0
REQUIRE(queue.getFps() == 0.0);

// Send 10 messages with a small delay
constexpr int NUM_MESSAGES = 10;
constexpr int DELAY_MS = 50; // 50ms delay between messages

for (int i = 0; i < NUM_MESSAGES; ++i) {
auto msg = std::make_shared<ADatatype>();
queue.send(msg);
std::this_thread::sleep_for(std::chrono::milliseconds(DELAY_MS));
}

// Compute expected FPS: 10 messages over roughly 450ms ~20 FPS
double fps = queue.getFps();
REQUIRE(fps > 15.0); // Should be around 20 FPS
REQUIRE(fps < 30.0); // Upper bound check

// Send one more message, verify FPS updates consistently
auto msg = std::make_shared<ADatatype>();
queue.send(msg);
fps = queue.getFps();
REQUIRE(fps > 15.0); // Still consistent with recent message rate

// Wait long enough (more than 2 seconds) so that old timestamps drop out,
// causing the FPS to be recalculated from fewer (or no) messages.
std::this_thread::sleep_for(std::chrono::milliseconds(2100));
fps = queue.getFps();
// With fewer than 2 messages in the recent window, getFps() should return 0.0
REQUIRE(fps == 0.0);
}
Loading