From e8ce6b02e68ca7606c2bee475d13dcbc5b70f736 Mon Sep 17 00:00:00 2001
From: "helei.sig11" <helei.sig11@bytedance.com>
Date: Mon, 20 Mar 2023 14:50:11 +0800
Subject: [PATCH 1/2] brpc: add bson support

Signed-off-by: helei.sig11 <helei.sig11@bytedance.com>
---
 .../install-essential-dependences/action.yml  |   2 +-
 .gitignore                                    |   1 +
 CMakeLists.txt                                |   7 +
 Makefile                                      |   1 +
 config_brpc.sh                                |  15 +-
 src/butil/bson_util.cc                        | 244 ++++++++++++++++++
 src/butil/bson_util.h                         |  79 ++++++
 7 files changed, 344 insertions(+), 5 deletions(-)
 create mode 100644 src/butil/bson_util.cc
 create mode 100644 src/butil/bson_util.h

diff --git a/.github/actions/install-essential-dependences/action.yml b/.github/actions/install-essential-dependences/action.yml
index 4b3249d89a..1fbc6aa319 100644
--- a/.github/actions/install-essential-dependences/action.yml
+++ b/.github/actions/install-essential-dependences/action.yml
@@ -3,5 +3,5 @@ runs:
   steps:
     - run: ulimit -c unlimited -S && sudo bash -c "echo 'core.%e.%p' > /proc/sys/kernel/core_pattern"
       shell: bash
-    - run: sudo apt-get install -y git g++ make libssl-dev libgflags-dev libprotobuf-dev libprotoc-dev protobuf-compiler libleveldb-dev libgoogle-perftools-dev
+    - run: sudo apt-get install -y git g++ make libssl-dev libgflags-dev libprotobuf-dev libprotoc-dev protobuf-compiler libleveldb-dev libgoogle-perftools-dev libbson-dev
       shell: bash
diff --git a/.gitignore b/.gitignore
index bdd210096b..5977716275 100644
--- a/.gitignore
+++ b/.gitignore
@@ -13,6 +13,7 @@
 *.so
 *.dylib
 *.rej
+*.patch
 /output
 /test/output
 build/
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 35365b270b..c1f7fc43a7 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -152,6 +152,7 @@ endif()
 
 find_package(Protobuf REQUIRED)
 find_package(Threads REQUIRED)
+find_package (bson-1.0 1.7 REQUIRED)
 
 find_path(LEVELDB_INCLUDE_PATH NAMES leveldb/db.h)
 find_library(LEVELDB_LIB NAMES leveldb)
@@ -210,11 +211,16 @@ endif()
 
 find_package(OpenSSL)
 
+find_package(bson-1.0 REQUIRED)
+
+get_target_property(BSON_INCLUDE_DIRS mongo::bson_shared INTERFACE_INCLUDE_DIRECTORIES)
+
 include_directories(
         ${GFLAGS_INCLUDE_PATH}
         ${PROTOBUF_INCLUDE_DIRS}
         ${LEVELDB_INCLUDE_PATH}
         ${OPENSSL_INCLUDE_DIR}
+        ${BSON_INCLUDE_DIRS}
         )
 
 set(DYNAMIC_LIB
@@ -293,6 +299,7 @@ set(BUTIL_SOURCES
     ${PROJECT_SOURCE_DIR}/src/butil/atomicops_internals_x86_gcc.cc
     ${PROJECT_SOURCE_DIR}/src/butil/base64.cc
     ${PROJECT_SOURCE_DIR}/src/butil/big_endian.cc
+    ${PROJECT_SOURCE_DIR}/src/butil/bson_util.cc
     ${PROJECT_SOURCE_DIR}/src/butil/cpu.cc
     ${PROJECT_SOURCE_DIR}/src/butil/debug/alias.cc
     ${PROJECT_SOURCE_DIR}/src/butil/debug/asan_invalid_access.cc
diff --git a/Makefile b/Makefile
index 574c63bbfb..a3a5430df3 100644
--- a/Makefile
+++ b/Makefile
@@ -69,6 +69,7 @@ BUTIL_SOURCES = \
     src/butil/atomicops_internals_x86_gcc.cc \
     src/butil/base64.cc \
     src/butil/big_endian.cc \
+	src/butil/bson_util.cc \
     src/butil/cpu.cc \
     src/butil/debug/alias.cc \
     src/butil/debug/asan_invalid_access.cc \
diff --git a/config_brpc.sh b/config_brpc.sh
index eabfda3f20..8bdde1ed5a 100755
--- a/config_brpc.sh
+++ b/config_brpc.sh
@@ -170,6 +170,7 @@ OPENSSL_LIB=$(find_dir_of_lib ssl)
 # Inconvenient to check these headers in baidu-internal
 #PTHREAD_HDR=$(find_dir_of_header_or_die pthread.h)
 OPENSSL_HDR=$(find_dir_of_header_or_die openssl/ssl.h mesalink/openssl/ssl.h)
+BSON_HDR=$(find_dir_of_header bson.h)
 
 if [ $WITH_MESALINK != 0 ]; then
     MESALINK_HDR=$(find_dir_of_header_or_die mesalink/openssl/ssl.h)
@@ -199,6 +200,8 @@ if [ "$SYSTEM" = "Darwin" ]; then
 	DYNAMIC_LINKINGS="$DYNAMIC_LINKINGS -Wl,-U,_RegisterThriftProtocol"
 fi
 append_linking() {
+    #convert lib-a.b.c to lib_a_b_c
+    local lib_name=`echo $2 | sed 's/-\|\./_/g'`
     if [ -f $1/lib${2}.a ]; then
         if [ "$SYSTEM" = "Darwin" ]; then
             # *.a must be explicitly specified in clang
@@ -206,10 +209,10 @@ append_linking() {
         else
             STATIC_LINKINGS="$STATIC_LINKINGS -l$2"
         fi
-        export STATICALLY_LINKED_$2=1
+        export STATICALLY_LINKED_$lib_name=1
     else
         DYNAMIC_LINKINGS="$DYNAMIC_LINKINGS -l$2"
-        export STATICALLY_LINKED_$2=0
+        export STATICALLY_LINKED_$lib_name=0
     fi
 }
 
@@ -219,6 +222,10 @@ append_linking $GFLAGS_LIB gflags
 PROTOBUF_LIB=$(find_dir_of_lib_or_die protobuf)
 append_linking $PROTOBUF_LIB protobuf
 
+# namespace c, grep it from source.
+BSON_LIB=$(find_dir_of_lib_or_die bson-1.0)
+append_linking $BSON_LIB bson-1.0
+
 LEVELDB_LIB=$(find_dir_of_lib_or_die leveldb)
 # required by leveldb
 if [ -f $LEVELDB_LIB/libleveldb.a ]; then
@@ -263,7 +270,7 @@ fi
 PROTOBUF_HDR=$(find_dir_of_header_or_die google/protobuf/message.h)
 LEVELDB_HDR=$(find_dir_of_header_or_die leveldb/db.h)
 
-HDRS=$($ECHO "$GFLAGS_HDR\n$PROTOBUF_HDR\n$LEVELDB_HDR\n$OPENSSL_HDR" | sort | uniq)
+HDRS=$($ECHO "$GFLAGS_HDR\n$PROTOBUF_HDR\n$LEVELDB_HDR\n$OPENSSL_HDR\n$BSON_HDR" | sort | uniq)
 LIBS=$($ECHO "$GFLAGS_LIB\n$PROTOBUF_LIB\n$LEVELDB_LIB\n$OPENSSL_LIB\n$SNAPPY_LIB" | sort | uniq)
 
 absent_in_the_list() {
@@ -373,7 +380,7 @@ fi
 append_to_output "CPPFLAGS=${CPPFLAGS}"
 append_to_output "# without the flag, linux+arm64 may crash due to folding on TLS.
 ifeq (\$(CC),gcc)
-  ifeq (\$(shell uname -p),aarch64) 
+  ifeq (\$(shell uname -p),aarch64)
     CPPFLAGS+=-fno-gcse
   endif
 endif
diff --git a/src/butil/bson_util.cc b/src/butil/bson_util.cc
new file mode 100644
index 0000000000..0e42212c11
--- /dev/null
+++ b/src/butil/bson_util.cc
@@ -0,0 +1,244 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "butil/bson_util.h"
+
+#include <cassert>
+
+#include "butil/logging.h"
+
+static ssize_t BsonReaderWrapper(void* handle, void* buf, size_t count) {
+    butil::IOBuf* iobuf = static_cast<butil::IOBuf*>(handle);
+    return iobuf->cutn(buf, count);
+}
+
+namespace butil {
+namespace bson {
+
+BsonEnumerator::BsonEnumerator(IOBuf* iobuf)
+    : _buf(iobuf)
+    , _reader(::bson_reader_new_from_handle(_buf, BsonReaderWrapper, NULL)) { 
+    if (!_reader) {
+        _has_error = true;
+    }
+}
+
+bool BsonEnumerator::HasError() {
+    return _has_error;
+}
+
+const bson_t* BsonEnumerator::Next() {
+    if (_has_error) {
+        return nullptr;
+    }
+
+    bool eof;
+    const bson_t* doc = bson_reader_read(_reader, &eof);
+    if (!doc && !eof) {
+        _has_error = true;
+    }
+    return doc;
+}
+
+BsonEnumerator::~BsonEnumerator() {
+    if (_reader) {
+        bson_reader_destroy(_reader);
+    }
+}
+
+UniqueBsonPtr ExtractBsonFromIOBuf(IOBuf& iobuf) {
+    uint32_t bson_length;
+    const size_t n = iobuf.copy_to(&bson_length, sizeof(bson_length));
+    if (n < sizeof(bson_length) || iobuf.size() < bson_length + sizeof(bson_length)) {
+        return nullptr;
+    }
+    std::unique_ptr<uint8_t[]> buffer(new uint8_t[bson_length]);
+    iobuf.copy_to(buffer.get(), bson_length);
+    return UniqueBsonPtr(bson_new_from_data(buffer.get(), bson_length));
+}
+
+bool bson_get_double(bson_t* doc, const char *key, double *value) {
+    assert(doc);
+    assert(key);
+    assert(value);
+    bson_iter_t iter;
+    if (!bson_iter_init(&iter, doc) || !bson_iter_find(&iter, key) ||
+        !BSON_ITER_HOLDS_DOUBLE(&iter)) {
+        return false;
+    }
+    *value = bson_iter_double(&iter);
+    return true;
+}
+
+bool bson_get_int32(bson_t* doc, const char *key, int32_t *value) {
+    assert(doc);
+    assert(key);
+    assert(value);
+    bson_iter_t iter;
+    if (!bson_iter_init(&iter, doc) || !bson_iter_find(&iter, key) ||
+        !BSON_ITER_HOLDS_INT32(&iter)) {
+        return false;
+    }
+    *value = bson_iter_int32(&iter);
+    return true;
+}
+
+bool bson_get_int64(bson_t* doc, const char *key, int64_t *value) {
+    assert(doc);
+    assert(key);
+    assert(value);
+    bson_iter_t iter;
+    if (!bson_iter_init(&iter, doc) || !bson_iter_find(&iter, key) ||
+        !BSON_ITER_HOLDS_INT64(&iter)) {
+        return false;
+    }
+    *value = bson_iter_int64(&iter);
+    return true;
+}
+
+bool bson_get_str(bson_t* doc, const char *key, std::string *value) {
+    assert(doc);
+    assert(key);
+    assert(value);
+    bson_iter_t iter;
+    if (!bson_iter_init(&iter, doc) || !bson_iter_find(&iter, key) ||
+        !BSON_ITER_HOLDS_UTF8(&iter)) {
+        return false;
+    }
+    uint32_t length = 0;
+    const char *str = bson_iter_utf8(&iter, &length);
+    if (!str) {
+        return false;
+    } else {
+        *value = std::string(str, length);
+        return true;
+    }
+}
+
+bool bson_get_doc(bson_t* doc, const char *key, bson_t* *value) {
+    assert(doc);
+    assert(key);
+    assert(value);
+    bson_iter_t iter;
+    if (!bson_iter_init(&iter, doc) || !bson_iter_find(&iter, key) ||
+        !BSON_ITER_HOLDS_DOCUMENT(&iter)) {
+        return false;
+    }
+    uint32_t length = 0;
+    const uint8_t *document_str = nullptr;
+    bson_iter_document(&iter, &length, &document_str);
+    bson_t *value_doc = bson_new_from_data(document_str, length);
+    if (!value_doc) {
+        return false;
+    }
+    *value = value_doc;
+    return true;
+}
+
+bool bson_get_array(bson_t* doc, const char *key, std::vector<bson_t*> *value) {
+    assert(doc);
+    assert(key);
+    assert(value);
+    bson_iter_t iter;
+    if (!bson_iter_init(&iter, doc) || !bson_iter_find(&iter, key) ||
+        !BSON_ITER_HOLDS_ARRAY(&iter)) {
+        return false;
+    }
+    uint32_t length = 0;
+    const uint8_t *array_str = nullptr;
+    bson_iter_array(&iter, &length, &array_str);
+    bson_t bson_array;  // read only
+    bool array_init = bson_init_static(&bson_array, array_str, length);
+    if (!array_init) {
+        return false;
+    }
+    bson_iter_t array_iter;
+    bool r = bson_iter_init(&array_iter, &bson_array);
+    if (!r) {
+        return false;
+    }
+    while (bson_iter_next(&array_iter)) {
+        if (!BSON_ITER_HOLDS_DOCUMENT(&array_iter)) {
+            continue;
+        }
+        uint32_t doc_length = 0;
+        const uint8_t *document_str = nullptr;
+        bson_iter_document(&array_iter, &doc_length, &document_str);
+        bson_t *array_element_ptr = bson_new_from_data(document_str, doc_length);
+        if (!array_element_ptr) {
+            return false;
+        }
+        value->push_back(array_element_ptr);
+    }
+    return true;
+}
+
+bool bson_has_oid(bson_t* doc) {
+    assert(doc);
+    bson_iter_t iter;
+    const char *oid = "_id";
+    if (!bson_iter_init(&iter, doc) || !bson_iter_find(&iter, oid) ||
+        !BSON_ITER_HOLDS_OID(&iter)) {
+        return false;
+    }
+    return true;
+}
+
+bool bson_get_oid(bson_t* doc, const char *key, bson_oid_t *value) {
+    assert(doc);
+    assert(key);
+    assert(value);
+    bson_iter_t iter;
+    if (!bson_iter_init(&iter, doc) || !bson_iter_find(&iter, key) ||
+        !BSON_ITER_HOLDS_OID(&iter)) {
+        return false;
+    }
+    const bson_oid_t *oid = bson_iter_oid(&iter);
+    if (!oid) {
+        return false;
+    } else {
+        *value = *oid;
+        return true;
+    }
+}
+
+bool bson_get_bool(bson_t* doc, const char *key, bool *value) {
+    assert(doc);
+    assert(key);
+    assert(value);
+    bson_iter_t iter;
+    if (!bson_iter_init(&iter, doc) || !bson_iter_find(&iter, key) ||
+        !BSON_ITER_HOLDS_BOOL(&iter)) {
+        return false;
+    }
+    *value = bson_iter_bool(&iter);
+    return true;
+}
+
+std::pair<bool, bson_type_t> bson_get_type(bson_t* doc, const char *key) {
+    assert(doc);
+    assert(key);
+    bson_iter_t iter;
+    if (!bson_iter_init(&iter, doc) || !bson_iter_find(&iter, key)) {
+        return std::make_pair(false, BSON_TYPE_EOD);
+    } else {
+        return std::make_pair(true, bson_iter_type(&iter));
+    }
+}
+
+}  // namespace bson
+}  // namespace butil
diff --git a/src/butil/bson_util.h b/src/butil/bson_util.h
new file mode 100644
index 0000000000..9badfff4c2
--- /dev/null
+++ b/src/butil/bson_util.h
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BUTIL_BSON_UTIL_H
+#define BUTIL_BSON_UTIL_H
+
+#include <bson.h>
+
+#include <stdint.h>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "butil/iobuf.h"
+
+namespace butil {
+namespace bson {
+
+struct BsonDeleter {
+    void operator()(bson_t* doc) {
+        bson_destroy(doc);
+    } 
+};
+
+class BsonEnumerator {
+public:
+    BsonEnumerator(IOBuf* iobuf);
+    bool HasError();
+    const bson_t* Next();
+    ~BsonEnumerator();
+
+private:
+    bool _has_error;
+    IOBuf* _buf;
+    bson_reader_t* _reader;
+};
+
+typedef std::unique_ptr<bson_t, BsonDeleter> UniqueBsonPtr;
+
+UniqueBsonPtr ExtractBsonFromIOBuf(IOBuf& iobuf);
+
+bool bson_get_double(bson_t* doc, const char *key, double* value);
+
+bool bson_get_int32(bson_t* doc, const char *key, int32_t *value);
+
+bool bson_get_int64(bson_t* doc, const char *key, int64_t *value);
+
+bool bson_get_str(bson_t* doc, const char *key, std::string *value);
+
+bool bson_get_doc(bson_t* doc, const char *key, bson_t* *value);
+
+bool bson_get_array(bson_t* doc, const char *key, std::vector<bson_t*> *value);
+
+bool bson_has_oid(bson_t* doc);
+
+bool bson_get_oid(bson_t* doc, const char *key, bson_oid_t *value);
+
+bool bson_get_bool(bson_t* doc, const char *key, bool *value);
+
+std::pair<bool, bson_type_t> bson_get_type(bson_t* doc, const char *key);
+
+}  // namespace bson
+}  // namespace butil
+
+#endif  // BUTIL_BSON_UTIL_H

From ee12867d5b007678bca95d1a8408adf013c1a191 Mon Sep 17 00:00:00 2001
From: "helei.sig11" <helei.sig11@bytedance.com>
Date: Mon, 20 Mar 2023 20:50:00 +0800
Subject: [PATCH 2/2] brpc: support mongo client

Signed-off-by: helei.sig11 <helei.sig11@bytedance.com>
---
 example/mongo_c++/Makefile         |  69 +++++++
 example/mongo_c++/mongo_press.cpp  | 107 ++++++++++
 src/brpc/global.cpp                |   8 +-
 src/brpc/mongo.cpp                 | 316 +++++++++++++++++++++++++++++
 src/brpc/mongo.h                   | 145 +++++++++++++
 src/brpc/mongo_head.h              |  16 +-
 src/brpc/policy/mongo.proto        |  36 +---
 src/brpc/policy/mongo_protocol.cpp | 230 ++++++++++++++++-----
 src/brpc/policy/mongo_protocol.h   |  23 +++
 src/brpc/proto_base.proto          |   2 +
 src/butil/bson_util.cc             |   2 +-
 11 files changed, 869 insertions(+), 85 deletions(-)
 create mode 100644 example/mongo_c++/Makefile
 create mode 100644 example/mongo_c++/mongo_press.cpp
 create mode 100644 src/brpc/mongo.cpp
 create mode 100644 src/brpc/mongo.h

diff --git a/example/mongo_c++/Makefile b/example/mongo_c++/Makefile
new file mode 100644
index 0000000000..42ab5cdc3b
--- /dev/null
+++ b/example/mongo_c++/Makefile
@@ -0,0 +1,69 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+BRPC_PATH = ../../
+include $(BRPC_PATH)/config.mk
+CXXFLAGS+=$(CPPFLAGS) -std=c++0x -DNDEBUG -O2 -pipe -W -Wall -fPIC -fno-omit-frame-pointer
+HDRS+=$(BRPC_PATH)/output/include
+LIBS+=$(BRPC_PATH)/output/lib
+HDRPATHS = $(addprefix -I, $(HDRS))
+LIBPATHS = $(addprefix -L, $(LIBS))
+COMMA=,
+SOPATHS=$(addprefix -Wl$(COMMA)-rpath$(COMMA), $(LIBS))
+
+PRESS_SOURCES = mongo_press.cpp
+
+PRESS_OBJS = $(addsuffix .o, $(basename $(PRESS_SOURCES)))
+
+ifeq ($(SYSTEM),Darwin)
+ ifneq ("$(LINK_SO)", "")
+	STATIC_LINKINGS += -lbrpc
+ else
+	# *.a must be explicitly specified in clang
+	STATIC_LINKINGS += $(BRPC_PATH)/output/lib/libbrpc.a
+ endif
+	LINK_OPTIONS_SO = $^ $(STATIC_LINKINGS) $(DYNAMIC_LINKINGS)
+	LINK_OPTIONS = $^ $(STATIC_LINKINGS) $(DYNAMIC_LINKINGS)
+else ifeq ($(SYSTEM),Linux)
+	STATIC_LINKINGS += -lbrpc
+	LINK_OPTIONS_SO = -Xlinker "-(" $^ -Xlinker "-)" $(STATIC_LINKINGS) $(DYNAMIC_LINKINGS)
+	LINK_OPTIONS = -Xlinker "-(" $^ -Wl,-Bstatic $(STATIC_LINKINGS) -Wl,-Bdynamic -Xlinker "-)" $(DYNAMIC_LINKINGS)
+endif
+
+.PHONY:all
+all: mongo_press
+
+.PHONY:clean
+clean:
+	@echo "> Cleaning"
+	rm -rf redis_press redis_cli $(PRESS_OBJS) $(CLI_OBJS) $(SERVER_OBJS)
+
+mongo_press:$(PRESS_OBJS)
+	@echo "> Linking $@"
+ifneq ("$(LINK_SO)", "")
+	$(CXX) $(LIBPATHS) $(SOPATHS) $(LINK_OPTIONS_SO) -o $@
+else
+	$(CXX) $(LIBPATHS) $(LINK_OPTIONS) -o $@
+endif
+
+%.o:%.cpp
+	@echo "> Compiling $@"
+	$(CXX) -c $(HDRPATHS) $(CXXFLAGS) $< -o $@
+
+%.o:%.cc
+	@echo "> Compiling $@"
+	$(CXX) -c $(HDRPATHS) $(CXXFLAGS) $< -o $@
diff --git a/example/mongo_c++/mongo_press.cpp b/example/mongo_c++/mongo_press.cpp
new file mode 100644
index 0000000000..1855e5d92b
--- /dev/null
+++ b/example/mongo_c++/mongo_press.cpp
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// A multi-threaded client getting keys from a redis-server constantly.
+
+#include <gflags/gflags.h>
+#include <bthread/bthread.h>
+#include <butil/logging.h>
+#include <butil/string_printf.h>
+#include <bvar/bvar.h>
+#include <brpc/channel.h>
+#include <brpc/server.h>
+#include <brpc/mongo.h>
+
+DEFINE_string(connection_type, "single",
+              "Connection type. Available values: pooled, short");
+DEFINE_string(server, "127.0.0.1", "IP Address of server");
+DEFINE_int32(port, 27017, "Port of server");
+DEFINE_int32(timeout_ms, 5000, "RPC timeout in milliseconds");
+DEFINE_int32(connect_timeout_ms, 5000, "RPC timeout in milliseconds");
+DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)");
+DEFINE_string(collection, "test_collection", "collection name");
+DEFINE_string(db, "test_db", "database name");
+
+int main(int argc, char* argv[]) {
+    // Parse gflags. We recommend you to use gflags as well.
+    GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
+
+    // A Channel represents a communication line to a Server. Notice that
+    // Channel is thread-safe and can be shared by all threads in your program.
+    brpc::Channel channel;
+
+    // Initialize the channel, NULL means using default options.
+    brpc::ChannelOptions options;
+    options.protocol = brpc::PROTOCOL_MONGO;
+    options.connection_type = FLAGS_connection_type;
+    options.timeout_ms = FLAGS_timeout_ms/*milliseconds*/;
+    options.max_retry = FLAGS_max_retry;
+    if (channel.Init(FLAGS_server.c_str(), FLAGS_port, &options) != 0) {
+        LOG(ERROR) << "Fail to initialize channel";
+        return -1;
+    }
+
+    brpc::Controller cntl;
+    butil::bson::UniqueBsonPtr command(
+        BCON_NEW("insert", BCON_UTF8(FLAGS_collection.c_str()),
+                 "$db", BCON_UTF8(FLAGS_db.c_str()),
+                 "comment", BCON_UTF8("brpc mongo press")));
+
+    brpc::MongoMessage req;
+    brpc::MongoMessage resp;
+    req.set_body(std::move(command));
+    req.set_key("documents");
+    for (size_t i = 0; i < 10; i++) {
+        char user_id[64];
+        char user_name[64];
+        ::snprintf(user_id, sizeof(user_id), "user-%lu", i);
+        ::snprintf(user_name, sizeof(user_name), "user-name-%lu", i);
+        req.add_doc_sequence(butil::bson::UniqueBsonPtr(BCON_NEW(
+                       "user", BCON_UTF8(user_id),
+                       "_id", BCON_INT32(i),
+                       "user_name", BCON_UTF8(user_name))));
+    }
+    LOG(INFO) << "MongoRequest: " << req;
+    channel.CallMethod(nullptr, &cntl, &req, &resp, nullptr);
+
+    if (!cntl.Failed()) {
+        LOG(INFO) << "OK: \n" << req << "\n" <<  resp;
+    } else {
+        LOG(INFO) << "Failed: \n" << req << "\n" <<  resp;
+        LOG(INFO) << cntl.ErrorText();
+        return 0;
+    }
+
+    while (!brpc::IsAskedToQuit()) {
+        brpc::Controller cntl;
+        brpc::MongoMessage req;
+        brpc::MongoMessage resp;
+        butil::bson::UniqueBsonPtr command(
+            BCON_NEW("find", BCON_UTF8(FLAGS_collection.c_str()),
+                     "$db", BCON_UTF8(FLAGS_db.c_str()),
+                     "comment", BCON_UTF8("brpc mongo press query")));
+        req.set_body(std::move(command));
+        channel.CallMethod(nullptr, &cntl, &req, &resp, nullptr);
+        if (!cntl.Failed()) {
+            LOG(INFO) << "OK: \n" << req << "\n" <<  resp;
+        } else {
+            LOG(INFO) << cntl.ErrorText();
+        }
+        bthread_usleep(1000*1000);
+    }
+    return 0;
+}
diff --git a/src/brpc/global.cpp b/src/brpc/global.cpp
index 30c2f1a3b9..6f507f4a25 100644
--- a/src/brpc/global.cpp
+++ b/src/brpc/global.cpp
@@ -509,10 +509,12 @@ static void GlobalInitializeOrDieImpl() {
     }
 
     Protocol mongo_protocol = { ParseMongoMessage,
-                                NULL, NULL,
-                                ProcessMongoRequest, NULL,
+                                SerializeMongoRequest,
+                                PackMongoRequest,
+                                ProcessMongoRequest,
+                                ProcessMongoResponse,
                                 NULL, NULL, NULL,
-                                CONNECTION_TYPE_POOLED, "mongo" };
+                                CONNECTION_TYPE_ALL, "mongo" };
     if (RegisterProtocol(PROTOCOL_MONGO, mongo_protocol) != 0) {
         exit(1);
     }
diff --git a/src/brpc/mongo.cpp b/src/brpc/mongo.cpp
new file mode 100644
index 0000000000..dbe34abf56
--- /dev/null
+++ b/src/brpc/mongo.cpp
@@ -0,0 +1,316 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "brpc/mongo.h"
+
+#include <google/protobuf/reflection_ops.h>     // ReflectionOps::Merge
+
+#include "brpc/proto_base.pb.h"
+#include "butil/logging.h"
+#include "mongo_head.h"
+
+namespace brpc {
+
+bool MongoMessage::SerializeToIOBuf(butil::IOBuf *buf) const {
+    // flag bits
+    uint32_t flag_bits = butil::ByteSwapToLE32(_flag_bits);
+    buf->append(&flag_bits, sizeof(flag_bits));
+
+    // body section
+    uint8_t payload_type = MONGO_PAYLOAD_TYPE_BODY;
+    buf->append(&payload_type, sizeof(payload_type));
+    buf->append(bson_get_data(_body.get()), _body->len);
+
+    // data section
+    if (_key.empty() || _doc_sequence.empty()) {
+        return true;
+    }
+    if (bson_has_field(_body.get(), _key.c_str())) {
+        return false;
+    }
+
+    payload_type = MONGO_PAYLOAD_TYPE_DOC_SEQUENCE;
+    uint8_t section_length = 4 + _key.size() + 1;
+    for (const UniqueBsonPtr& doc : _doc_sequence) {
+        section_length += doc->len;
+    }
+    section_length = butil::ByteSwapToLE32(section_length);
+    buf->append(&payload_type, sizeof(payload_type));
+    buf->append(&section_length, sizeof(section_length));
+    buf->append(_key.c_str(), _key.size() + 1);
+    for (const UniqueBsonPtr& doc : _doc_sequence) {
+        buf->append(bson_get_data(doc.get()), doc->len);
+    }
+    return true;
+}
+
+bool MongoMessage::ParseFromIOBuf(butil::IOBuf* payload) {
+    bool success = false;
+    do {
+        // TODO: support checksum
+        payload->cutn(&_flag_bits, sizeof(_flag_bits));
+        _flag_bits = butil::ByteSwapToLE32(_flag_bits);
+        if (_flag_bits != 0) {
+            LOG(WARNING) << "current only zero flag is supported";
+            break;
+        }
+
+        // parse first section
+        uint8_t payload_type;
+        payload->cutn(&payload_type, sizeof(payload_type));
+        if (payload_type != MONGO_PAYLOAD_TYPE_BODY) {
+            break;
+        }
+
+        _body = butil::bson::ExtractBsonFromIOBuf(*payload);
+        if (!_body) {
+            break;
+        }
+        payload->pop_front(_body->len);
+
+        if (payload->size() == 0) {
+            success = true;
+            break;
+        }
+
+        // parse second section
+        payload->cutn(&payload_type, sizeof(payload_type));
+        if (payload_type != MONGO_PAYLOAD_TYPE_DOC_SEQUENCE) {
+            break;
+        }
+
+        uint32_t section_length = 0;
+        payload->cutn(&section_length, sizeof(section_length));
+        section_length = butil::ByteSwapToLE32(section_length);
+        if (section_length != sizeof(section_length) + payload->size()) {
+            break;
+        }
+
+        char c = '\0';
+        while (payload->cut1(&c) && c != '\0') {
+            _key.push_back(c);
+        }
+        if (c == '\0') {
+            _key.push_back(c);
+        } else {
+            break;
+        }
+
+        butil::bson::BsonEnumerator enumerator(payload);
+        while (const bson_t* doc = enumerator.Next()) {
+            _doc_sequence.emplace_back(bson_copy(doc));
+        };
+        if (!enumerator.HasError()) {
+            success = true;
+        }
+    } while (false);
+
+    return success;
+}
+
+void MongoMessage::Print(std::ostream& os) const {
+    os << "{";
+    do {
+        if (!_body) {
+            break;
+        }
+
+        if (_head.message_length != 0) {
+            os << " \"message_length\" : " << _head.message_length << ",";
+            os << " \"request_id\" : " << _head.request_id << ",";
+            os << " \"response_to\" : " << _head.response_to << ",";
+            os << " \"op_code\" : " << _head.op_code << ",";
+        }
+
+        os << " \"flag_bits\" : " << _flag_bits << ",";
+        os << " \"body\" : ";
+        char *json = bson_as_json(_body.get(), nullptr);
+        os << json;
+        bson_free(json);
+
+        if (_key.empty() || _doc_sequence.empty()) {
+            break;
+        }
+        os << ", \"" << _key << "\" : [";
+        size_t i = 0;
+        for (const UniqueBsonPtr& doc : _doc_sequence) {
+            json = bson_as_json(doc.get(), nullptr);
+            os << json;
+            ++i;
+            if (i != _doc_sequence.size()) {
+                os << ", ";
+            }
+            bson_free(json);
+        }
+        os << "]";
+    } while (false);
+    os << "}";
+}
+
+MongoMessage::MongoMessage()
+    : ::google::protobuf::Message() {
+    SharedCtor();
+}
+
+MongoMessage::MongoMessage(const MongoMessage& from)
+    : ::google::protobuf::Message() {
+    SharedCtor();
+    MergeFrom(from);
+}
+
+MongoMessage::~MongoMessage() {
+    SharedDtor();
+}
+
+void MongoMessage::SharedCtor() {
+    _cached_size = 0;
+    memset(&_head, 0, sizeof(_head));
+    _flag_bits = 0;
+}
+
+void MongoMessage::SharedDtor() {
+}
+
+void MongoMessage::SetCachedSize(int size) const {
+    _cached_size = size;
+}
+
+MongoMessage* MongoMessage::New() const {
+    return new MongoMessage;
+}
+
+#if GOOGLE_PROTOBUF_VERSION >= 3006000
+MongoMessage* MongoMessage::New(::google::protobuf::Arena* arena) const {
+    return CreateMaybeMessage<MongoMessage>(arena);
+}
+#endif
+
+void MongoMessage::Clear() {
+    _cached_size = 0;
+    memset(&_head, 0, sizeof(_head));
+    _flag_bits = 0;
+    _body.reset();
+    _key.clear();
+    _doc_sequence.clear();
+}
+
+bool MongoMessage::MergePartialFromCodedStream(
+    ::google::protobuf::io::CodedInputStream*) {
+    LOG(WARNING) << "You're not supposed to parse a MongoMessage";
+    return true;
+}
+
+void MongoMessage::SerializeWithCachedSizes(
+    ::google::protobuf::io::CodedOutputStream*) const {
+    LOG(WARNING) << "You're not supposed to serialize a MongoMessage";
+}
+
+::google::protobuf::uint8* MongoMessage::SerializeWithCachedSizesToArray(
+    ::google::protobuf::uint8* target) const {
+    LOG(WARNING) << "You're not supposed to serialize a MongoMessage";
+    return target;
+}
+
+int MongoMessage::ByteSize() const {
+    int total_size = 0;
+    do {
+        if (!_body) {
+            break;
+        }
+        // header + flag_bits(4) + section_kind(1) + body
+        total_size += sizeof(_head) + 5 + _body->len;
+        // section_kind(1) + section_length(4) + doc_sequence
+        if (!_key.empty() && !_doc_sequence.empty()) {
+            total_size += 5;
+            for (const UniqueBsonPtr& doc : _doc_sequence) {
+                total_size += doc->len;
+            }
+        }
+    } while (false);
+    _cached_size = total_size;
+    return total_size;
+}
+
+void MongoMessage::MergeFrom(const ::google::protobuf::Message& from) {
+    GOOGLE_CHECK_NE(&from, this);
+    const MongoMessage* source = dynamic_cast<const MongoMessage*>(&from);
+    if (source == NULL) {
+        ::google::protobuf::internal::ReflectionOps::Merge(from, this);
+    } else {
+        MergeFrom(*source);
+    }
+}
+
+void MongoMessage::MergeFrom(const MongoMessage& from) {
+    GOOGLE_CHECK_NE(&from, this);
+    if (_body || !_doc_sequence.empty()) {
+        LOG(WARNING) << "You're not supposed to merge a non-empty MongoMessage";
+        return;
+    }
+    _head = from._head;
+    _flag_bits = from._flag_bits;
+    _body.reset(bson_copy(from._body.get()));
+    _key = from._key;
+    for (const UniqueBsonPtr& doc : from._doc_sequence) {
+        _doc_sequence.emplace_back(bson_copy(doc.get()));
+    }
+    _cached_size = from._cached_size;
+}
+
+void MongoMessage::CopyFrom(const ::google::protobuf::Message& from) {
+    if (&from == this) return;
+    Clear();
+    MergeFrom(from);
+}
+
+void MongoMessage::CopyFrom(const MongoMessage& from) {
+    if (&from == this) return;
+    Clear();
+    MergeFrom(from);
+}
+
+bool MongoMessage::IsInitialized() const {
+    return _body != nullptr;
+}
+
+void MongoMessage::Swap(MongoMessage* other) {
+    if (other != this) {
+        std::swap(_head, other->_head);
+        std::swap(_flag_bits, other->_flag_bits);
+        std::swap(_body, other->_body);
+        std::swap(_key, other->_key);
+        std::swap(_doc_sequence, other->_doc_sequence);
+    }
+}
+
+const ::google::protobuf::Descriptor* MongoMessage::descriptor() {
+    return MongoMessageBase::descriptor();
+}
+
+::google::protobuf::Metadata MongoMessage::GetMetadata() const {
+    ::google::protobuf::Metadata metadata;
+    metadata.descriptor = MongoMessage::descriptor();
+    metadata.reflection = NULL;
+    return metadata;
+}
+
+std::ostream& operator<<(std::ostream& os, const MongoMessage& msg) {
+    msg.Print(os);
+    return os;
+}
+
+}  // namespace brpc
diff --git a/src/brpc/mongo.h b/src/brpc/mongo.h
new file mode 100644
index 0000000000..63921a6413
--- /dev/null
+++ b/src/brpc/mongo.h
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BRPC_MONGO_H
+#define BRPC_MONGO_H
+
+#include <google/protobuf/message.h>
+
+#include "brpc/mongo_head.h"
+#include "brpc/pb_compat.h"
+#include "brpc/parse_result.h"
+#include "butil/iobuf.h"
+#include "butil/bson_util.h"
+
+namespace brpc {
+
+
+class MongoMessage : public ::google::protobuf::Message {
+public:
+    MongoMessage();
+    virtual ~MongoMessage();
+    MongoMessage(const MongoMessage& from);
+    MongoMessage& operator=(const MongoMessage& from) {
+        CopyFrom(from);
+        return *this;
+    };
+    void Swap(MongoMessage* other);
+
+    mongo_head_t& head() {
+        return _head;
+    }
+
+    const mongo_head_t& head() const {
+        return _head;
+    }
+
+    const uint32_t& flag_bits() const {
+        return _flag_bits;
+    }
+
+    void set_flag_bits(int32_t flag_bits) {
+        _flag_bits = flag_bits;
+    }
+
+    const std::string& key() const {
+        return _key;
+    }
+
+    void set_key(std::string key) {
+        _key = std::move(key);
+    }
+
+    const bson_t* body() const {
+        return _body.get();
+    }
+
+    void set_body(bson_t* body) {
+        _body.reset(bson_copy(body));
+    }
+
+    void set_body(butil::bson::UniqueBsonPtr&& body) {
+        _body.reset(body.release());
+    }
+
+    bson_t* doc_sequence(size_t i) {
+        return _doc_sequence[i].get();
+    }
+
+    size_t doc_sequence_size() {
+        return _doc_sequence.size();
+    }
+
+    void add_doc_sequence() {
+        _doc_sequence.emplace_back(bson_new());
+    }
+
+    void add_doc_sequence(butil::bson::UniqueBsonPtr&& doc) {
+        _doc_sequence.emplace_back(std::move(doc));
+    }
+
+    // Serialize/Parse |Payload| from IOBuf
+    bool ParseFromIOBuf(butil::IOBuf* payload);
+    bool SerializeToIOBuf(butil::IOBuf* buf) const;
+
+    // Protobuf methods.
+    MongoMessage* New() const PB_319_OVERRIDE;
+#if GOOGLE_PROTOBUF_VERSION >= 3006000
+    MongoMessage* New(::google::protobuf::Arena* arena) const override;
+#endif
+    void CopyFrom(const ::google::protobuf::Message& from) PB_321_OVERRIDE;
+    void MergeFrom(const ::google::protobuf::Message& from) override;
+    void CopyFrom(const MongoMessage& from);
+    void MergeFrom(const MongoMessage& from);
+    void Clear() override;
+    bool IsInitialized() const override;
+
+    int ByteSize() const;
+    bool MergePartialFromCodedStream(
+        ::google::protobuf::io::CodedInputStream* input) PB_310_OVERRIDE;
+    void SerializeWithCachedSizes(
+        ::google::protobuf::io::CodedOutputStream* output) const PB_310_OVERRIDE;
+    ::google::protobuf::uint8* SerializeWithCachedSizesToArray(::google::protobuf::uint8* output) const PB_310_OVERRIDE;
+    int GetCachedSize() const override { return _cached_size; }
+
+    static const ::google::protobuf::Descriptor* descriptor();
+
+    void Print(std::ostream&) const;
+
+protected:
+    ::google::protobuf::Metadata GetMetadata() const override;
+
+private:
+    using UniqueBsonPtr = butil::bson::UniqueBsonPtr;
+
+    void SharedCtor();
+    void SharedDtor();
+    virtual void SetCachedSize(int size) const override;
+
+    mongo_head_t _head;
+    uint32_t _flag_bits;
+    UniqueBsonPtr _body;
+    std::string _key;
+    std::vector<UniqueBsonPtr> _doc_sequence;
+
+    mutable int _cached_size;   // ByteSize
+};
+
+std::ostream& operator<<(std::ostream& os, const MongoMessage&);
+
+}  // namespace brpc
+#endif  // BRPC_MONGO_H
diff --git a/src/brpc/mongo_head.h b/src/brpc/mongo_head.h
index b9da0171f2..732b94bec5 100644
--- a/src/brpc/mongo_head.h
+++ b/src/brpc/mongo_head.h
@@ -28,19 +28,33 @@ namespace brpc {
 //   https://docs.mongodb.org/manual/reference/mongodb-wire-protocol/#request-opcodes
 enum MongoOpCode {
     MONGO_OPCODE_REPLY         = 1,
-    MONGO_OPCODE_MSG           = 1000,
     MONGO_OPCODE_UPDATE        = 2001,
     MONGO_OPCODE_INSERT        = 2002,
     MONGO_OPCODE_QUERY         = 2004,
     MONGO_OPCODE_GET_MORE      = 2005,
     MONGO_OPCODE_DELETE        = 2006,
     MONGO_OPCODE_KILL_CURSORS  = 2007,
+    MONGO_OPCODE_COMPRESSED    = 2012,
+    MONGO_OPCODE_MSG           = 2013,
+};
+
+enum MongoMsgFlag {
+    MONGO_MSG_FLAG_CHECKSUM_PRESENT = 1 << 0,
+    MONGO_MSG_FLAG_MORE_TO_COME     = 1 << 2,
+    MONGO_MSG_FLAG_EXHAUST_ALLOWED  = 1 << 3,
+};
+
+enum MongoPayloadType {
+    MONGO_PAYLOAD_TYPE_BODY = 0,
+    MONGO_PAYLOAD_TYPE_DOC_SEQUENCE = 1,
+    MONGO_PAYLOAD_TYPE_INTERNAL = 2,
 };
 
 inline bool is_mongo_opcode(int32_t op_code) {
     switch (op_code) {
     case MONGO_OPCODE_REPLY:         return true;
     case MONGO_OPCODE_MSG:           return true;
+    case MONGO_OPCODE_COMPRESSED:    return true;
     case MONGO_OPCODE_UPDATE:        return true; 
     case MONGO_OPCODE_INSERT:        return true; 
     case MONGO_OPCODE_QUERY:         return true; 
diff --git a/src/brpc/policy/mongo.proto b/src/brpc/policy/mongo.proto
index 87b839b3ad..1646935757 100644
--- a/src/brpc/policy/mongo.proto
+++ b/src/brpc/policy/mongo.proto
@@ -24,40 +24,8 @@ option java_generic_services = true;
 option java_package="com.brpc.policy";
 option java_outer_classname="MongoProto";
 
-enum MongoOp {
-    OPREPLY = 1;
-    DBMSG = 1000;
-    DB_UPDATE = 2001;
-    DB_INSERT = 2002;
-    DB_QUERY = 2004;
-    DB_GETMORE = 2005;
-    DB_DELETE = 2006;
-    DB_KILLCURSORS = 2007;
-    DB_COMMAND = 2008;
-    DB_COMMANDREPLY = 2009;
-}
-
-message MongoHeader {
-    required int32 message_length = 1;
-    required int32 request_id = 2;
-    required int32 response_to = 3;
-    required MongoOp op_code = 4;
-}
-
-message MongoRequest {
-    required MongoHeader header = 1;
-    required string message = 2;
-}
-
-message MongoResponse {
-    required MongoHeader header = 1;
-    required int32 response_flags = 2;
-    required int64 cursor_id = 3;
-    required int32 starting_from = 4;
-    required int32 number_returned = 5;
-    required string message = 6;
-}
+import "brpc/proto_base.proto";
 
 service MongoService {
-    rpc default_method(MongoRequest) returns (MongoResponse);
+     rpc default_method(brpc.MongoMessageBase) returns (brpc.MongoMessageBase);
 }
diff --git a/src/brpc/policy/mongo_protocol.cpp b/src/brpc/policy/mongo_protocol.cpp
index 82bb3e0b36..174b9d3e9c 100644
--- a/src/brpc/policy/mongo_protocol.cpp
+++ b/src/brpc/policy/mongo_protocol.cpp
@@ -24,12 +24,13 @@
 #include "brpc/socket.h"                   // Socket
 #include "brpc/server.h"                   // Server
 #include "brpc/span.h"
+#include "brpc/mongo.h"
 #include "brpc/mongo_head.h"
 #include "brpc/details/server_private_accessor.h"
 #include "brpc/details/controller_private_accessor.h"
 #include "brpc/mongo_service_adaptor.h"
 #include "brpc/policy/most_common_message.h"
-#include "brpc/policy/nshead_protocol.h"
+#include "brpc/policy/mongo_protocol.h"
 #include "brpc/policy/mongo.pb.h"
 #include "brpc/details/usercode_backup_pool.h"
 
@@ -41,6 +42,44 @@ void bthread_assign_data(void* data);
 namespace brpc {
 namespace policy {
 
+class MongoStreamData : public StreamUserData {
+public:
+    // @StreamUserData
+    virtual void DestroyStreamUserData(SocketUniquePtr& sending_sock,
+                                       Controller* cntl,
+                                       int error_code,
+                                       bool end_of_rpc) override;
+
+    void set_request_id(uint32_t request_id) {
+        _request_id = request_id;
+    }
+
+    void set_correlation_id(uint64_t correlation_id) {
+        _correlation_id = correlation_id;
+    }
+
+    uint32_t request_id() const {
+        return _request_id;
+    }
+    uint64_t correlation_id() const {
+        return _correlation_id;
+    }
+
+private:
+    uint64_t _correlation_id;
+    uint32_t _request_id;
+};
+
+class MongoStreamCreator : public StreamCreator {
+public:
+    // @StreamCreator
+    virtual StreamUserData* OnCreatingStream(SocketUniquePtr* inout,
+                                             Controller* cntl) override;
+    // @StreamCreator
+    virtual void DestroyStreamCreator(Controller* cntl) override;
+};
+
+
 struct SendMongoResponse : public google::protobuf::Closure {
     SendMongoResponse(const Server *server) :
         status(NULL),
@@ -53,8 +92,8 @@ struct SendMongoResponse : public google::protobuf::Closure {
     int64_t received_us;
     const Server *server;
     Controller cntl;
-    MongoRequest req;
-    MongoResponse res;
+    MongoMessage req;
+    MongoMessage res;
 };
 
 SendMongoResponse::~SendMongoResponse() {
@@ -75,26 +114,15 @@ void SendMongoResponse::Run() {
             server->options().mongo_service_adaptor;
     butil::IOBuf res_buf;
     if (cntl.Failed()) {
-        adaptor->SerializeError(res.header().response_to(), &res_buf);
-    } else if (res.has_message()) {
-        mongo_head_t header = {
-            res.header().message_length(),
-            res.header().request_id(),
-            res.header().response_to(),
-            res.header().op_code()
-        };
-        res_buf.append(static_cast<const void*>(&header), sizeof(mongo_head_t));
-        int32_t response_flags = res.response_flags();
-        int64_t cursor_id = res.cursor_id();
-        int32_t starting_from = res.starting_from();
-        int32_t number_returned = res.number_returned();
-        res_buf.append(&response_flags, sizeof(response_flags));
-        res_buf.append(&cursor_id, sizeof(cursor_id));
-        res_buf.append(&starting_from, sizeof(starting_from));
-        res_buf.append(&number_returned, sizeof(number_returned));
-        res_buf.append(res.message());
+        adaptor->SerializeError(res.head().response_to, &res_buf);
+    } else if (res.IsInitialized()) {
+        mongo_head_t head = res.head();
+        head.make_host_endian();
+        res_buf.append(&head, sizeof(head));
+        res.SerializeToIOBuf(&res_buf);
     }
 
+    // TODO: handle compress
     if (!res_buf.empty()) {
         // Have the risk of unlimited pending responses, in which case, tell
         // users to set max_concurrency.
@@ -110,18 +138,19 @@ void SendMongoResponse::Run() {
 ParseResult ParseMongoMessage(butil::IOBuf* source,
                               Socket* socket, bool /*read_eof*/, const void *arg) {
     const Server* server = static_cast<const Server*>(arg);
-    const MongoServiceAdaptor* adaptor = server->options().mongo_service_adaptor;
-    if (NULL == adaptor) {
+    const MongoServiceAdaptor* adaptor =
+        server ? server->options().mongo_service_adaptor : nullptr;
+    if (server && !adaptor) {
         // The server does not enable mongo adaptor.
         return MakeParseError(PARSE_ERROR_TRY_OTHERS);
     }
 
-    char buf[sizeof(mongo_head_t)];
-    const char *p = (const char *)source->fetch(buf, sizeof(buf));
-    if (NULL == p) {
+    if (source->size() < sizeof(mongo_head_t)) {
         return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
     }
-    mongo_head_t header = *(const mongo_head_t*)p;
+
+    mongo_head_t header;
+    source->copy_to(&header, sizeof(header));
     header.make_host_endian();
     if (!is_mongo_opcode(header.op_code)) {
         // The op_code plays the role of "magic number" here.
@@ -137,13 +166,14 @@ ParseResult ParseMongoMessage(butil::IOBuf* source,
     } else if (source->length() < body_len) {
         return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
     }
+
     // Mongo protocol is a protocol with state. Each connection has its own
     // mongo context. (e.g. last error occured on the connection, the cursor
     // created by the last Query). The context is stored in
     // socket::_input_message, and created at the first time when msg
     // comes over the socket.
     Destroyable *socket_context_msg = socket->parsing_context();
-    if (NULL == socket_context_msg) {
+    if (NULL == socket_context_msg && server) {
         MongoContext *context = adaptor->CreateSocketContext();
         if (NULL == context) {
             return MakeParseError(PARSE_ERROR_NO_RESOURCE);
@@ -152,15 +182,55 @@ ParseResult ParseMongoMessage(butil::IOBuf* source,
         socket->reset_parsing_context(socket_context_msg);
     }
     policy::MostCommonMessage* msg = policy::MostCommonMessage::Get();
-    source->cutn(&msg->meta, sizeof(buf));
-    size_t act_body_len = source->cutn(&msg->payload, body_len - sizeof(buf));
-    if (act_body_len != body_len - sizeof(buf)) {
+    source->cutn(&msg->meta, sizeof(header));
+    size_t act_body_len = source->cutn(&msg->payload, body_len - sizeof(header));
+    if (act_body_len != body_len - sizeof(header)) {
         CHECK(false);     // Very unlikely, unless memory is corrupted.
         return MakeParseError(PARSE_ERROR_TRY_OTHERS);
     }
     return MakeMessage(msg);
 }
 
+void SerializeMongoRequest(butil::IOBuf* buf,
+                           Controller* cntl,
+                           const google::protobuf::Message* request) {
+    if (request == NULL) {
+        return cntl->SetFailed(EREQUEST, "Request is NULL");
+    }
+    if (request->GetDescriptor() != brpc::MongoMessage::descriptor()) {
+        return cntl->SetFailed(EREQUEST, "The request is not a MongoMessage");
+    }
+    const brpc::MongoMessage* mm = static_cast<const brpc::MongoMessage*>(request);
+    if (mm->ByteSize() == 0) {
+        return cntl->SetFailed(EREQUEST, "request byte size is empty");
+    }
+    if (!mm->SerializeToIOBuf(buf)) {
+        return cntl->SetFailed(EREQUEST, "failed to serialize request");
+    }
+    cntl->set_stream_creator(butil::get_leaky_singleton<MongoStreamCreator>());
+}
+
+void PackMongoRequest(butil::IOBuf *req_buf,
+                      SocketMessage** user_message,
+                      uint64_t correlation_id,
+                      const google::protobuf::MethodDescriptor*,
+                      Controller* cntl,
+                      const butil::IOBuf& request_body,
+                      const Authenticator* auth) {
+    ControllerPrivateAccessor accessor(cntl);
+    MongoStreamData *stream_data = static_cast<MongoStreamData*>(accessor.get_stream_user_data());
+    stream_data->set_correlation_id(correlation_id);
+    // TODO(helei): handle compress
+    mongo_head_t head;
+    head.message_length = sizeof(mongo_head_t) + request_body.size();
+    head.request_id = stream_data->request_id();
+    head.response_to = 0;
+    head.op_code = MONGO_OPCODE_MSG;
+    head.make_host_endian();
+    req_buf->append(&head, sizeof(head));
+    req_buf->append(request_body);
+}
+
 // Defined in baidu_rpc_protocol.cpp
 void EndRunningCallMethodInPool(
     ::google::protobuf::Service* service,
@@ -183,13 +253,13 @@ void ProcessMongoRequest(InputMessageBase* msg_base) {
 
     const google::protobuf::ServiceDescriptor* srv_des = MongoService::descriptor();
     if (1 != srv_des->method_count()) {
-        LOG(WARNING) << "method count:" << srv_des->method_count()
-                     << " of MongoService should be equal to 1!";
+        LOG(WARNING) << "method count: " << srv_des->method_count()
+                     << "of MongoService should be equal to 1!";
     }
 
-    const Server::MethodProperty *mp =
-            ServerPrivateAccessor(server)
-            .FindMethodPropertyByFullName(srv_des->method(0)->full_name());
+    const Server::MethodProperty* mp = 
+        ServerPrivateAccessor(server)
+        .FindMethodPropertyByFullName(srv_des->method(0)->full_name());
 
     MongoContextMessage *context_msg =
         dynamic_cast<MongoContextMessage*>(socket->parsing_context());
@@ -237,7 +307,7 @@ void ProcessMongoRequest(InputMessageBase* msg_base) {
 
         if (NULL == mp ||
             mp->service->GetDescriptor() == BadMethodService::descriptor()) {
-            mongo_done->cntl.SetFailed(ENOMETHOD, "Fail to find default_method");
+            mongo_done->cntl.SetFailed(ENOMETHOD, "Failed to find default_method");
             break;
         }
         // Switch to service-specific error.
@@ -254,20 +324,13 @@ void ProcessMongoRequest(InputMessageBase* msg_base) {
             }
         }
         
-        if (!MongoOp_IsValid(header->op_code)) {
+        if (!is_mongo_opcode(header->op_code)) {
             mongo_done->cntl.SetFailed(EREQUEST, "Unknown op_code:%d", header->op_code);
             break;
         }
         
         mongo_done->cntl.set_log_id(header->request_id);
-        const std::string &body_str = msg->payload.to_string();
-        mongo_done->req.set_message(body_str.c_str(), body_str.size());
-        mongo_done->req.mutable_header()->set_message_length(header->message_length);
-        mongo_done->req.mutable_header()->set_request_id(header->request_id);
-        mongo_done->req.mutable_header()->set_response_to(header->response_to);
-        mongo_done->req.mutable_header()->set_op_code(
-                static_cast<MongoOp>(header->op_code));
-        mongo_done->res.mutable_header()->set_response_to(header->request_id);
+        mongo_done->req.head() = *header;
         mongo_done->received_us = msg->received_us();
 
         google::protobuf::Service* svc = mp->service;
@@ -294,5 +357,80 @@ void ProcessMongoRequest(InputMessageBase* msg_base) {
     mongo_done->Run();
 }
 
+void MongoStreamData::DestroyStreamUserData(SocketUniquePtr& sending_sock,
+                                            Controller* cntl,
+                                            int error_code,
+                                            bool end_of_rpc) {
+    butil::ResourceId<MongoStreamData> slot{ _request_id };
+    butil::return_resource(slot);
+}
+
+StreamUserData* MongoStreamCreator::OnCreatingStream(SocketUniquePtr* inout,
+                                                     Controller* cntl) {
+    butil::ResourceId<MongoStreamData> slot;
+    MongoStreamData *stream_data = butil::get_resource<MongoStreamData>(&slot);
+    stream_data->set_request_id(slot.value);
+    return stream_data;
+}
+
+void MongoStreamCreator::DestroyStreamCreator(Controller* cntl) {
+    // MongoStreamCreator is a global singleton value, Don't delete
+    // it in this function.
+}
+
+void ProcessMongoResponse(InputMessageBase* msg_base) {
+    const int64_t start_parse_us = butil::cpuwide_time_us();
+    DestroyingPtr<MostCommonMessage> msg(static_cast<MostCommonMessage*>(msg_base));
+    mongo_head_t head;
+    msg->meta.copy_to(&head, sizeof(head));
+    head.make_host_endian();
+    butil::ResourceId<MongoStreamData> slot{ head.response_to };
+    MongoStreamData* stream_data = butil::address_resource(slot);
+    Controller* cntl = NULL;
+    if (!stream_data) {
+        LOG(ERROR) << "failed to address stream data";
+    }
+    const bthread_id_t cid = { stream_data->correlation_id() };
+    const int rc = bthread_id_lock(cid, (void**)&cntl);
+    if (rc != 0) {
+        LOG_IF(ERROR, rc != EINVAL && rc != EPERM)
+            << "Fail to lock correlation_id=" << cid << ": " << berror(rc);
+        return;
+    }
+
+    ControllerPrivateAccessor accessor(cntl);
+    Span* span = accessor.span();
+    if (span) {
+        span->set_base_real_us(msg->base_real_us());
+        span->set_received_us(msg->received_us());
+        span->set_response_size(msg->meta.size() + msg->payload.size() + 12);
+        span->set_start_parse_us(start_parse_us);
+    }
+
+    const int saved_error = cntl->ErrorCode();
+    do {
+        if (cntl->response() == NULL) {
+            break;
+        }
+        if (cntl->response()->GetDescriptor() != MongoMessage::descriptor()) {
+            cntl->SetFailed(ERESPONSE, "must be mongo response");
+            break;
+        }
+        MongoMessage* rsp = dynamic_cast<MongoMessage*>(cntl->response());
+        if (!rsp) {
+            cntl->SetFailed(ERESPONSE, "must be mongo response");
+            break;
+        }
+        msg->meta.copy_to(&rsp->head(), sizeof(rsp->head()));
+        rsp->head().make_host_endian();
+        if (!rsp->ParseFromIOBuf(&msg->payload)) {
+            cntl->SetFailed(ERESPONSE, "failed to parse mongo response");
+            break;
+        }
+    } while (false);
+    msg.reset();
+    accessor.OnResponse(cid, saved_error);
+}
+
 }  // namespace policy
 } // namespace brpc
diff --git a/src/brpc/policy/mongo_protocol.h b/src/brpc/policy/mongo_protocol.h
index 3b8e6c44c3..842a4a821f 100644
--- a/src/brpc/policy/mongo_protocol.h
+++ b/src/brpc/policy/mongo_protocol.h
@@ -20,6 +20,8 @@
 
 #include "brpc/protocol.h"
 #include "brpc/input_messenger.h"
+#include "brpc/socket_message.h"
+#include "butil/iobuf.h"
 
 
 namespace brpc {
@@ -28,9 +30,30 @@ namespace policy {
 // Parse binary format of mongo
 ParseResult ParseMongoMessage(butil::IOBuf* source, Socket* socket, bool read_eof, const void *arg);
 
+
 // Actions to a (client) request in mongo format
 void ProcessMongoRequest(InputMessageBase* msg);
 
+void PackMongoRequest(butil::IOBuf *buf,
+                      SocketMessage**,
+                      uint64_t correlation_id,
+                      const google::protobuf::MethodDescriptor* method,
+                      Controller* controller,
+                      const butil::IOBuf& request,
+                      const Authenticator* auth);
+
+// Actions to a (server) response in mongo format.
+void ProcessMongoResponse(InputMessageBase* msg);
+
+
+void SerializeMongoRequest(butil::IOBuf* buf,
+                           Controller* cntl,
+                           const google::protobuf::Message* request);
+
+const std::string& GetMongoMethodName(
+    const google::protobuf::MethodDescriptor*,
+    const Controller*);
+
 } // namespace policy
 } // namespace brpc
 
diff --git a/src/brpc/proto_base.proto b/src/brpc/proto_base.proto
index c0bbc086e1..c7d5df096a 100644
--- a/src/brpc/proto_base.proto
+++ b/src/brpc/proto_base.proto
@@ -24,6 +24,8 @@ message RedisResponseBase {}
 
 message EspMessageBase {}
 
+message MongoMessageBase {}
+
 message MemcacheRequestBase {}
 message MemcacheResponseBase {}
 
diff --git a/src/butil/bson_util.cc b/src/butil/bson_util.cc
index 0e42212c11..550491c907 100644
--- a/src/butil/bson_util.cc
+++ b/src/butil/bson_util.cc
@@ -63,7 +63,7 @@ BsonEnumerator::~BsonEnumerator() {
 UniqueBsonPtr ExtractBsonFromIOBuf(IOBuf& iobuf) {
     uint32_t bson_length;
     const size_t n = iobuf.copy_to(&bson_length, sizeof(bson_length));
-    if (n < sizeof(bson_length) || iobuf.size() < bson_length + sizeof(bson_length)) {
+    if (n < sizeof(bson_length) || iobuf.size() < bson_length) {
         return nullptr;
     }
     std::unique_ptr<uint8_t[]> buffer(new uint8_t[bson_length]);