Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: add memcache client with consistent hashring #6

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
250 changes: 250 additions & 0 deletions packages/memcache-client/lib/consistent-hashring-client.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
"use strict";

const optionalRequire = require("optional-require")(require);
const Promise = optionalRequire("bluebird", { message: false, default: global.Promise });
const Zstd = optionalRequire("node-zstd", false);
const nodeify = require("./nodeify");
const ValuePacker = require("./value-packer");
const nullLogger = require("./null-logger");
const defaults = require("./defaults");
const ConsistentHashRingServers = require("./consistent-hashring-servers.js");
const EventEmitter = require("events");

/* eslint-disable no-bitwise,no-magic-numbers,max-params,max-statements,no-var */
/* eslint max-len:[2,120] */

class ConsistentHashRingClient extends EventEmitter {
constructor(server, options) {
super();
if (!options) {
options = {};
}
this.options = options;
this.socketID = 1;
this._packer = new ValuePacker(options.compressor || Zstd);
this._logger = options.logger !== undefined ? options.logger : nullLogger;
this.options.cmdTimeout = options.cmdTimeout || defaults.CMD_TIMEOUT_MS;
this._servers = new ConsistentHashRingServers(this, server, options);
this.Promise = options.Promise || Promise;
}

shutdown() {
this._servers.shutdown();
}

//
// Allows you to send any arbitrary data you want to the server.
// You are responsible for making sure the data contains properly
// formed memcached ASCII protocol commands and data.
// Any responses from the server will be parsed by the client
// and returned as best as it could.
//
// If data is a function, then it will be called with socket which you can
// use to write any data you want else it will be passed to socket.write.
//
// DO NOT send multiple commands in a single call. Bad things will happen.
//
// Set options.noreply if you want to fire and forget. Note that this
// doesn't apply if you send a command like get/gets/stats, which don't
// have the noreply option.
//
send(key, data, options, callback) {
if (typeof options === "function") {
callback = options;
options = {};
} else if (options === undefined) {
options = {};
}

return this._callbackSend(key, data, options, callback);
}

// the promise only version of send
xsend(key, data, options) {
return this._servers.doCmd(key, c => this._send(c, data, options || {}));
}

// a convenient method to send a single line as a command to the server
// with \r\n appended for you automatically
cmd(key, data, options, callback) {
return this.send(
key,
socket => {
socket.write(data);
if (options && options.noreply) {
socket.write(" noreply\r\n");
} else {
socket.write("\r\n");
}
},
options,
callback
);
}

// "set" means "store this data".
set(key, value, options, callback) {
options = options || {};
if (options.ignoreNotStored === undefined) {
options.ignoreNotStored = this.options.ignoreNotStored;
}
return this.store("set", key, value, options, callback);
}

// "add" means "store this data, but only if the server *doesn't* already
// hold data for this key".
add(key, value, options, callback) {
return this.store("add", key, value, options, callback);
}

// "replace" means "store this data, but only if the server *does*
// already hold data for this key".
replace(key, value, options, callback) {
return this.store("replace", key, value, options, callback);
}

// "append" means "add this data to an existing key after existing data".
append(key, value, options, callback) {
return this.store("append", key, value, options, callback);
}

// "prepend" means "add this data to an existing key before existing data".
prepend(key, value, options, callback) {
return this.store("prepend", key, value, options, callback);
}

// delete key, fire & forget with options.noreply
delete(key, options, callback) {
return this.cmd(key, `delete ${key}`, options, callback);
}

// incr key by value, fire & forget with options.noreply
incr(key, value, options, callback) {
return this.cmd(key, `incr ${key} ${value}`, options, callback);
}

// decrease key by value, fire & forget with options.noreply
decr(key, value, options, callback) {
return this.cmd(key, `decr ${key} ${value}`, options, callback);
}

// touch key with exp time, fire & forget with options.noreply
touch(key, exptime, options, callback) {
return this.cmd(key, `touch ${key} ${exptime}`, options, callback);
}

// get version of server
version(callback) {
return this.cmd("", `version`, {}, callback);
}

// a generic API for issuing one of the store commands
store(cmd, key, value, options, callback) {
if (typeof options === "function") {
callback = options;
options = {};
} else if (options === undefined) {
options = {};
}

const lifetime =
options.lifetime !== undefined ? options.lifetime : this.options.lifetime || 60;
const casUniq = "";
const noreply = options.noreply ? ` noreply` : "";

//
// store commands
// <command name> <key> <flags> <exptime> <bytes> [noreply]\r\n
//
const _data = socket => {
const packed = this._packer.pack(value, options.compress === true);
const bytes = Buffer.byteLength(packed.data);
const msg = `${cmd} ${key} ${packed.flag} ${lifetime} ${bytes}${casUniq}${noreply}\r\n`;
socket.write(msg);
socket.write(packed.data);
socket.write("\r\n");
};

return this._callbackSend(key, _data, options, callback);
}

get(key, options, callback) {
return this.retrieve("get", key, options, callback);
}

// A generic API for issuing get or gets command
retrieve(cmd, key, options, callback) {
if (typeof options === "function") {
callback = options;
options = {};
}
return nodeify(this.xretrieve(cmd, key, options), callback);
}

// the promise only version of retrieve
xretrieve(cmd, key) {
return this.xsend(key, `${cmd} ${key}\r\n`).then(r => r[key]);
}

//
// Internal methods
//

_send(conn, data, options) {
try {
// send data to connection
if (typeof data === "function") {
data(conn.socket);
} else {
conn.socket.write(data);
}

// if no reply wanted then just return
if (options.noreply) {
return this.Promise.resolve();
}

// queue up context to listen for reply
return new this.Promise((resolve, reject) => {
const context = {
error: null,
results: {},
callback: (err, result) => {
if (err) {
if (options.ignoreNotStored === true && err.message === "NOT_STORED") {
return resolve("ignore NOT_STORED");
}
return reject(err);
}
if (result) {
return resolve(result);
} else if (context.error) {
return reject(context.error);
} else {
return resolve(context.results);
}
}
};

conn.queueCommand(context);
});
} catch (err) {
return this.Promise.reject(err);
}
}

// internal send that expects all params passed (even if they are undefined)
_callbackSend(key, data, options, callback) {
return nodeify(this.xsend(key, data, options), callback);
}

_unpackValue(result) {
//
// VALUE <key> <flags> <bytes> [<cas unique>]\r\n
//
result.flag = +result.cmdTokens[2];
return this._packer.unpack(result);
}
}

module.exports = ConsistentHashRingClient;
97 changes: 97 additions & 0 deletions packages/memcache-client/lib/consistent-hashring-servers.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
"use strict";

const defaults = require("./defaults");
const MemcacheNode = require("./memcache-node");
const _defaults = require("lodash/defaults");
const forOwn = require("lodash/forOwn");
const pickBy = require("lodash/pickBy");
const isEmpty = require("lodash/isEmpty");

const HashRing = require("hashring");

/* eslint-disable max-statements,no-magic-numbers */

/*
* Manages a consistent hash ring of servers
*/

class ConsistentHashRingServers {
constructor(client, server, options) {
this.client = client;
this._servers = Array.isArray(server) ? server : [server];
this._exServers = {};
this._nodes = {};
this._config = _defaults({}, options, {
maxConnections: defaults.MAX_CONNECTIONS,
failedServerOutTime: defaults.FAILED_SERVER_OUT_TIME,
retryFailedServerInterval: defaults.RETRY_FAILED_SERVER_INTERVAL,
keepLastServer: defaults.KEEP_LAST_SERVER
});
this._hashRing = new HashRing(this._servers, "sha1");
}

shutdown() {
forOwn(this._nodes, node => node.shutdown());
}

doCmd(key, action) {
if (!isEmpty(this._exServers)) {
this._retryServers();
}
if (this._servers.length === 0) {
throw new Error("No more valid servers left");
}
if (this._servers.length === 1 && this._config.keepLastServer === true) {
return this._getNode(key).doCmd(action);
}
const node = this._getNode(key);
return node.doCmd(action).catch(err => {
if (!err.connecting) {
throw err;
}
// failed to connect to server, exile it
const s = node.options.server;
const _servers = [];
for (let i = 0; i < this._servers.length; i++) {
if (s === this._servers[i]) {
this._exServers[this._servers[i]] = Date.now();
this._hashRing.remove(this._servers[i]);
} else {
_servers.push(this._servers[i]);
}
}
this._servers = _servers;
return this.doCmd(key, action);
});
}

_retryServers() {
const now = Date.now();
if (now - this._lastRetryTime < this._config.retryFailedServerInterval) {
return;
}
this._lastRetryTime = now;
forOwn(this._exServers, (exiledTime, server) => {
if (now - exiledTime >= this._config.failedServerOutTime) {
this._exServers[server] = null;
this._servers.push(server);
this._hashRing.add(server);
}
});
this._exServers = pickBy(this._exServers, exiledTime => exiledTime !== null);
}

_getNode(key) {
const server = this._hashRing.get(key);
if (!this._nodes[server]) {
const options = {
server,
maxConnections: this._config.maxConnections
};
this._nodes[server] = new MemcacheNode(this.client, options);
}
return this._nodes[server];
}
}

module.exports = ConsistentHashRingServers;
4 changes: 3 additions & 1 deletion packages/memcache-client/package.json
Original file line number Diff line number Diff line change
@@ -22,9 +22,11 @@
"author": "Joel Chen <[email protected]>",
"license": "Apache-2.0",
"dependencies": {
"lodash": "4.17.11",
"lodash.defaults": "^4.2.0",
"memcache-parser": "^0.2.7",
"optional-require": "^1.0.0"
"optional-require": "^1.0.0",
"hashring": "3.2.0"
},
"devDependencies": {
"electrode-archetype-njs-module-dev": "^2.3.0",
20 changes: 0 additions & 20 deletions packages/memcache-client/test/spec/client.spec.js
Original file line number Diff line number Diff line change
@@ -111,7 +111,6 @@ describe("memcache client", function() {
return new Promise(resolve => {
x.on("dangle-wait", data => {
expect(data.err).to.be.an.Error;
expect(data.err.message).includes("connect ETIMEDOUT");
sysConnectTimeout = Date.now() - start;
resolve();
});
@@ -382,25 +381,6 @@ describe("memcache client", function() {
});
});

it("should handle two thousand bad servers", () => {
console.log("testing 2000 bad servers - will take a long time");
const servers = [];
let port = 30000;
while (servers.length < 2000) {
port++;
if (port !== serverPort) {
servers.push({ server: `127.0.0.1:${port}`, maxConnections: 3 });
}
}
let testErr;
const x = new MemcacheClient({ server: { servers }, cmdTimeout: 20000 });
return x
.set("foo", "hello")
.catch(err => (testErr = err))
.then(() => expect(testErr.message).include("ECONNREFUSED"))
.then(() => expect(x._servers._servers).to.have.length(1));
}).timeout(30000);

it("should set a binary file and get it back correctly", () => {
const key1 = `image_${Date.now()}`;
const key2 = `image_${Date.now()}`;
Loading