Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[experimental]OCF: Pin management thread to supplied CPU core #26

Open
wants to merge 1 commit into
base: experimental
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion module/bdev/ocf/ctx.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,25 @@ struct ocf_persistent_meta_zone {

#define MAX_PERSISTENT_ZONES 2

struct vbdev_ocf_mngt_queue_ctx {
struct vbdev_ocf *vbdev;
ocf_queue_t mngt_queue;
struct spdk_thread *mngt_thread;
struct spdk_poller *mngt_poller;
};

/* Context of cache instance */
struct vbdev_ocf_cache_ctx {
struct vbdev_ocf *vbdev;
ocf_queue_t mngt_queue;
ocf_queue_t cleaner_queue;
pthread_mutex_t lock;
env_atomic refcnt;
struct vbdev_ocf_mngt_queue_ctx *mngt_ctx;

/* Channels for cleaner */
struct spdk_io_channel *cleaner_cache_channel;
struct spdk_io_channel *cleaner_core_channel;

bool create;
bool force;
char cache_name[OCF_CACHE_NAME_SIZE];
Expand Down
91 changes: 65 additions & 26 deletions module/bdev/ocf/vbdev_ocf.c
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,6 @@ static void
remove_base_bdev(struct vbdev_ocf_base *base)
{
if (base->attached) {
if (base->management_channel) {
spdk_put_io_channel(base->management_channel);
}

spdk_bdev_module_release_bdev(base->bdev);
/* Close the underlying bdev on its same opened thread. */
if (base->thread && base->thread != spdk_get_thread()) {
Expand Down Expand Up @@ -278,7 +274,7 @@ stop_vbdev_cmpl(ocf_cache_t cache, void *priv, int error)
{
struct vbdev_ocf *vbdev = priv;

vbdev_ocf_queue_put(vbdev->cache_ctx->mngt_queue);
vbdev_ocf_queue_put(vbdev->cache_ctx->mngt_ctx->mngt_queue);
ocf_mngt_cache_unlock(cache);

vbdev_ocf_mngt_continue(vbdev, error);
Expand Down Expand Up @@ -894,15 +890,38 @@ io_device_destroy_cb(void *io_device, void *ctx_buf)

/* OCF management queue deinitialization */
static void
vbdev_ocf_ctx_mngt_queue_stop(ocf_queue_t q)
_vbdev_ocf_ctx_mngt_queue_stop(void* ctx)
{
struct spdk_poller *poller = ocf_queue_get_priv(q);
struct vbdev_ocf_mngt_queue_ctx *mngt_ctx = ctx;
struct vbdev_ocf *vbdev = mngt_ctx->vbdev;
struct spdk_thread *thread = mngt_ctx->mngt_thread;

if (mngt_ctx->mngt_poller) {
spdk_poller_unregister(&mngt_ctx->mngt_poller);
}

if (vbdev->cache.management_channel) {
spdk_put_io_channel(vbdev->cache.management_channel);
}
if (vbdev->core.management_channel) {
spdk_put_io_channel(vbdev->core.management_channel);
}

free(mngt_ctx);

if (poller) {
spdk_poller_unregister(&poller);
if (vbdev->cpu_mask) {
spdk_thread_exit(thread);
}
}

static void
vbdev_ocf_ctx_mngt_queue_stop(ocf_queue_t q)
{
struct vbdev_ocf_mngt_queue_ctx *mngt_ctx = ocf_queue_get_priv(q);

spdk_thread_send_msg(mngt_ctx->mngt_thread, _vbdev_ocf_ctx_mngt_queue_stop, mngt_ctx);
}

static int
mngt_queue_poll(void *opaque)
{
Expand Down Expand Up @@ -1081,26 +1100,55 @@ start_cache_cmpl(ocf_cache_t cache, void *priv, int error)
vbdev_ocf_mngt_continue(vbdev, error);
}

static void
_create_management_queue(void* ctx)
{
struct vbdev_ocf *vbdev = ctx;
struct vbdev_ocf_mngt_queue_ctx *mngt_ctx = vbdev->cache_ctx->mngt_ctx;

mngt_ctx->mngt_poller = SPDK_POLLER_REGISTER(mngt_queue_poll, mngt_ctx->mngt_queue, 100);

vbdev->cache.management_channel = spdk_bdev_get_io_channel(vbdev->cache.desc);
vbdev->core.management_channel = spdk_bdev_get_io_channel(vbdev->core.desc);
}

static int
create_management_queue(struct vbdev_ocf *vbdev)
{
struct spdk_poller *mngt_poller;
struct spdk_cpuset cpumask = {};
struct vbdev_ocf_mngt_queue_ctx *mngt_ctx;
int rc;

rc = vbdev_ocf_queue_create(vbdev->ocf_cache, &vbdev->cache_ctx->mngt_queue, &mngt_queue_ops);
vbdev->cache_ctx->mngt_ctx = calloc(1, sizeof(*vbdev->cache_ctx->mngt_ctx));
if (!vbdev->cache_ctx->mngt_ctx) {
return -ENOMEM;
}
mngt_ctx = vbdev->cache_ctx->mngt_ctx;
mngt_ctx->vbdev = vbdev;

rc = vbdev_ocf_queue_create(vbdev->ocf_cache, &mngt_ctx->mngt_queue, &mngt_queue_ops);
if (rc) {
SPDK_ERRLOG("Unable to create mngt_queue: %d\n", rc);
free(vbdev->cache_ctx->mngt_ctx);
return rc;
}

mngt_poller = SPDK_POLLER_REGISTER(mngt_queue_poll, vbdev->cache_ctx->mngt_queue, 100);
if (mngt_poller == NULL) {
SPDK_ERRLOG("Unable to initiate mngt request: %s", spdk_strerror(ENOMEM));
return -ENOMEM;
if (vbdev->cpu_mask) {
rc = spdk_cpuset_parse(&cpumask, vbdev->cpu_mask);
if (rc) {
vbdev_ocf_queue_put(mngt_ctx->mngt_queue);
free(vbdev->cache_ctx->mngt_ctx);
return rc;
}

mngt_ctx->mngt_thread = spdk_thread_create("ocf_mngt", &cpumask);
} else {
mngt_ctx->mngt_thread = spdk_get_thread();
}

ocf_queue_set_priv(vbdev->cache_ctx->mngt_queue, mngt_poller);
ocf_mngt_cache_set_mngt_queue(vbdev->ocf_cache, vbdev->cache_ctx->mngt_queue);
ocf_queue_set_priv(mngt_ctx->mngt_queue, mngt_ctx);
ocf_mngt_cache_set_mngt_queue(vbdev->ocf_cache, mngt_ctx->mngt_queue);
spdk_thread_send_msg(mngt_ctx->mngt_thread, _create_management_queue, vbdev);

return 0;
}
Expand Down Expand Up @@ -1509,7 +1557,6 @@ attach_base(struct vbdev_ocf_base *base)
struct vbdev_ocf_base *existing = get_other_cache_base(base);
if (existing) {
base->desc = existing->desc;
base->management_channel = existing->management_channel;
base->attached = true;
return 0;
}
Expand All @@ -1529,14 +1576,6 @@ attach_base(struct vbdev_ocf_base *base)
return status;
}

base->management_channel = spdk_bdev_get_io_channel(base->desc);
if (!base->management_channel) {
SPDK_ERRLOG("Unable to get io channel '%s'\n", base->name);
spdk_bdev_module_release_bdev(base->bdev);
spdk_bdev_close(base->desc);
return -ENOMEM;
}

/* Save the thread where the base device is opened */
base->thread = spdk_get_thread();

Expand Down
2 changes: 1 addition & 1 deletion module/bdev/ocf/volume.c
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ prepare_submit(struct ocf_io *io)
return 0;
}

if (q == cctx->mngt_queue) {
if (q == cctx->mngt_ctx->mngt_queue) {
io_ctx->ch = base->management_channel;
return 0;
}
Expand Down