Skip to content

Commit b51b5d5

Browse files
committed
cetty-gearman: 完成echo协议
1 parent d74b822 commit b51b5d5

File tree

12 files changed

+100
-20
lines changed

12 files changed

+100
-20
lines changed

cetty-core/src/cetty/channel/DefaultChannelPipeline.cpp

+14-3
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ class DefaultChannelHandlerContext : public ChannelHandlerContext {
6161
canHandleUps(false),
6262
canHandleDowns(false),
6363
name(name),
64-
handler(handler) {
64+
handler(handler),
65+
sink() {
6566

6667
upstreamHandler = boost::dynamic_pointer_cast<ChannelUpstreamHandler>(handler);
6768

@@ -79,8 +80,6 @@ class DefaultChannelHandlerContext : public ChannelHandlerContext {
7980
throw InvalidArgumentException(
8081
"handler must be either ChannelUpstreamHandler or ChannelDownstreamHandler.");
8182
}
82-
83-
sink = this->defaultChannelPipeline.getSink();
8483
}
8584

8685
virtual ~DefaultChannelHandlerContext() {}
@@ -135,6 +134,10 @@ class DefaultChannelHandlerContext : public ChannelHandlerContext {
135134
nextDownstreamContext->downstreamHandler->handleDownstream(*nextDownstreamContext, e);
136135
}
137136
else {
137+
if (!sink) {
138+
sink = this->defaultChannelPipeline.getSink();
139+
}
140+
138141
BOOST_ASSERT(sink);
139142
sink->eventSunk(defaultChannelPipeline, e);
140143
}
@@ -156,6 +159,10 @@ class DefaultChannelHandlerContext : public ChannelHandlerContext {
156159
nextDownstreamContext->downstreamHandler->writeRequested(*nextDownstreamContext, e);
157160
}
158161
else {
162+
if (!sink) {
163+
sink = this->defaultChannelPipeline.getSink();
164+
}
165+
159166
BOOST_ASSERT(sink);
160167
sink->writeRequested(defaultChannelPipeline, e);
161168
}
@@ -177,6 +184,10 @@ class DefaultChannelHandlerContext : public ChannelHandlerContext {
177184
nextDownstreamContext->downstreamHandler->stateChangeRequested(*nextDownstreamContext, e);
178185
}
179186
else {
187+
if (!sink) {
188+
sink = this->defaultChannelPipeline.getSink();
189+
}
190+
180191
BOOST_ASSERT(sink);
181192
sink->stateChangeRequested(defaultChannelPipeline, e);
182193
}

cetty-gearman/build/GearmanClient/GearmanClient.vcxproj

+1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
<ClCompile Include="..\..\src\cetty\gearman\GearmanClient.cpp" />
2626
<ClCompile Include="..\..\src\cetty\gearman\GearmanDecoder.cpp" />
2727
<ClCompile Include="..\..\src\cetty\gearman\GearmanEncoder.cpp" />
28+
<ClCompile Include="..\..\src\cetty\gearman\GearmanMessage.cpp" />
2829
<ClCompile Include="..\..\src\cetty\gearman\GearmanMessageChecker.cpp" />
2930
<ClCompile Include="..\..\src\cetty\gearman\GearmanMessageHandler.cpp" />
3031
<ClCompile Include="..\..\src\cetty\gearman\GearmanPipelineFactory.cpp" />

cetty-gearman/build/GearmanClient/GearmanClient.vcxproj.filters

+3
Original file line numberDiff line numberDiff line change
@@ -60,5 +60,8 @@
6060
<ClCompile Include="..\..\src\cetty\gearman\GearmanMessageChecker.cpp">
6161
<Filter>src</Filter>
6262
</ClCompile>
63+
<ClCompile Include="..\..\src\cetty\gearman\GearmanMessage.cpp">
64+
<Filter>src</Filter>
65+
</ClCompile>
6366
</ItemGroup>
6467
</Project>

cetty-gearman/build/GearmanTest/GearmanTest.sln

+7
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,13 @@ Microsoft Visual Studio Solution File, Format Version 11.00
44
Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "GearmanTest", "GearmanTest.vcxproj", "{73CE27B2-1F98-49B4-9E79-C4F88840ACB3}"
55
ProjectSection(ProjectDependencies) = postProject
66
{CA71411F-D9FF-4686-8DC6-56A2FC982D9B} = {CA71411F-D9FF-4686-8DC6-56A2FC982D9B}
7+
{5C94F9F3-765C-4614-BADC-BCFEC0EE4B7B} = {5C94F9F3-765C-4614-BADC-BCFEC0EE4B7B}
78
EndProjectSection
89
EndProject
910
Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "GearmanClient", "..\GearmanClient\GearmanClient.vcxproj", "{CA71411F-D9FF-4686-8DC6-56A2FC982D9B}"
1011
EndProject
12+
Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "cetty", "..\..\..\cetty-core\build\cetty\cetty.vcxproj", "{5C94F9F3-765C-4614-BADC-BCFEC0EE4B7B}"
13+
EndProject
1114
Global
1215
GlobalSection(SolutionConfigurationPlatforms) = preSolution
1316
Debug|Win32 = Debug|Win32
@@ -22,6 +25,10 @@ Global
2225
{CA71411F-D9FF-4686-8DC6-56A2FC982D9B}.Debug|Win32.Build.0 = Debug|Win32
2326
{CA71411F-D9FF-4686-8DC6-56A2FC982D9B}.Release|Win32.ActiveCfg = Release|Win32
2427
{CA71411F-D9FF-4686-8DC6-56A2FC982D9B}.Release|Win32.Build.0 = Release|Win32
28+
{5C94F9F3-765C-4614-BADC-BCFEC0EE4B7B}.Debug|Win32.ActiveCfg = Debug|Win32
29+
{5C94F9F3-765C-4614-BADC-BCFEC0EE4B7B}.Debug|Win32.Build.0 = Debug|Win32
30+
{5C94F9F3-765C-4614-BADC-BCFEC0EE4B7B}.Release|Win32.ActiveCfg = Release|Win32
31+
{5C94F9F3-765C-4614-BADC-BCFEC0EE4B7B}.Release|Win32.Build.0 = Release|Win32
2532
EndGlobalSection
2633
GlobalSection(SolutionProperties) = preSolution
2734
HideSolutionNode = FALSE
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright (c) 2010-2012 frankee zhou (frankee.zhou at gmail dot com)
3+
*
4+
* Distributed under under the Apache License, version 2.0 (the "License").
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at:
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
17+
namespace cetty {
18+
namespace gearman {
19+
20+
}
21+
}

cetty-gearman/src/cetty/gearman/GearmanDecoder.cpp

+2
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ ChannelMessage GearmanDecoder::decode(ChannelHandlerContext& ctx,
6666
}
6767
}
6868

69+
message->setType(type);
70+
6971
return ChannelMessage(message);
7072
}
7173

cetty-gearman/src/cetty/gearman/GearmanEncoder.cpp

+39-13
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,6 @@ std::string GearmanEncoder::toString() const {
4040
return "GearmanEncoder";
4141
}
4242

43-
static const std::string REQUEST_MAGIC("\0REQ", 4);
44-
4543
ChannelMessage GearmanEncoder::encode(ChannelHandlerContext& ctx,
4644
const ChannelPtr& channel,
4745
const ChannelMessage& msg) {
@@ -55,23 +53,25 @@ ChannelMessage GearmanEncoder::encode(ChannelHandlerContext& ctx,
5553
int messageLength = 0;
5654

5755
if (message->hasData()) {
58-
const ChannelBufferPtr data = message->getData();
56+
const ChannelBufferPtr& data = message->getData();
5957
bodyLenth += data->readableBytes();
60-
messageLength = bodyLenth + headerLength;
58+
messageLength = bodyLenth + parametersLength;
6159

6260
if (data->aheadWritableBytes() >= parametersLength + headerLength) {
63-
data->offsetWriterIndex(-parametersLength - headerLength);
64-
65-
writeHeader(data, message->getType(), messageLength);
66-
writeParameters(data, message->getParameters(), true);
67-
61+
if (parametersLength) {
62+
writeParametersAhead(data, message->getParameters(), true);
63+
}
64+
writeHeaderAhead(data, message->getType(), messageLength);
6865
return ChannelMessage(data);
6966
}
7067
else {
7168
ChannelBufferPtr buffer = ChannelBuffers::buffer(bodyLenth + headerLength);
7269

7370
writeHeader(buffer, message->getType(), messageLength);
74-
writeParameters(buffer, message->getParameters(), true);
71+
if (parametersLength) {
72+
writeParameters(buffer, message->getParameters(), true);
73+
}
74+
7575
buffer->writeBytes(data);
7676

7777
return ChannelMessage(buffer);
@@ -82,7 +82,10 @@ ChannelMessage GearmanEncoder::encode(ChannelHandlerContext& ctx,
8282
messageLength = bodyLenth + headerLength;
8383
ChannelBufferPtr buffer = ChannelBuffers::buffer(messageLength);
8484
writeHeader(buffer, message->getType(), messageLength);
85-
writeParameters(buffer, message->getParameters(), false);
85+
86+
if (messageLength) {
87+
writeParameters(buffer, message->getParameters(), false);
88+
}
8689

8790
return ChannelMessage(buffer);
8891
}
@@ -105,15 +108,22 @@ int GearmanEncoder::caculateParametersLength(const GearmanMessagePtr& msg) {
105108
}
106109

107110
void GearmanEncoder::writeHeader(const ChannelBufferPtr& buffer, int type, int length) {
108-
buffer->writeBytes(REQUEST_MAGIC);
111+
buffer->writeBytes(GearmanMessage::REQUEST_MAGIC);
109112
buffer->writeInt(type);
110113
buffer->writeInt(length);
111114
}
112115

116+
void GearmanEncoder::writeHeaderAhead(const ChannelBufferPtr& buffer, int type, int length) {
117+
buffer->writeIntAhead(length);
118+
buffer->writeIntAhead(type);
119+
buffer->writeBytesAhead(GearmanMessage::REQUEST_MAGIC);
120+
}
121+
113122
void GearmanEncoder::writeParameters(const ChannelBufferPtr& buffer,
114123
const std::vector<std::string>& parameters,
115124
bool withZeroPad) {
116-
for (int i = 0, j = parameters.size(); i < j; ++i) {
125+
int j = parameters.size();
126+
for (int i = 0; i < j; ++i) {
117127
buffer->writeBytes(parameters[i]);
118128

119129
if (!withZeroPad && ((j - i) == 1)) {
@@ -125,6 +135,22 @@ void GearmanEncoder::writeParameters(const ChannelBufferPtr& buffer,
125135
}
126136
}
127137

138+
void GearmanEncoder::writeParametersAhead(const ChannelBufferPtr& buffer,
139+
const std::vector<std::string>& parameters,
140+
bool withZeroPad) {
141+
int j = parameters.size();
142+
if (j == 0) return;
143+
144+
if (withZeroPad) {
145+
buffer->writeByteAhead(0);
146+
}
147+
buffer->writeBytesAhead(parameters[j - 1]);
148+
149+
for (int i = j - 2; i >= 0; --i) {
150+
buffer->writeByteAhead(0);
151+
buffer->writeBytesAhead(parameters[i]);
152+
}
153+
}
128154

129155

130156
}

cetty-gearman/src/cetty/gearman/GearmanMessage.cpp

+3
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
namespace cetty {
2121
namespace gearman {
2222

23+
const std::string GearmanMessage::RESPONSE_MAGIC("\0RES", 4);
24+
const std::string GearmanMessage::REQUEST_MAGIC("\0REQ", 4);
25+
2326
GearmanMessagePtr GearmanMessage::createEchoReqMessage(const ChannelBufferPtr& payload) {
2427
GearmanMessagePtr request(new GearmanMessage);
2528
request->setType(GearmanMessage::ECHO_REQ);

cetty-gearman/src/cetty/gearman/GearmanWorker.cpp

-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
21
/*
32
* Copyright (c) 2010-2012 frankee zhou (frankee.zhou at gmail dot com)
43
*

cetty-gearman/test/cetty/gearman/GearmanTest.cpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ int main(int argc, char* argv[]) {
2525
// Print usage if no argument is specified.
2626

2727
// Parse options.
28-
std::string host = "192.168.2.145";
28+
std::string host = "192.168.1.112";
2929
int port = 4730;
3030
int ioThreadCount = 1;
3131

@@ -44,8 +44,8 @@ int main(int argc, char* argv[]) {
4444
GearmanTaskPtr task(new GearmanTask);
4545

4646
ChannelBufferPtr buffer = ChannelBuffers::buffer(1024, 64);
47-
for (int i = 0; i < 1024; i += 4) {
48-
buffer->writeInt(4);
47+
for (int i = 0; i < 1024; ++i) {
48+
buffer->writeByte('0');
4949
}
5050
task->request = GearmanMessage::createEchoReqMessage(buffer);
5151
c->write(ChannelMessage(task));

include/cetty/gearman/GearmanEncoder.h

+4
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,13 @@ class GearmanEncoder : public cetty::handler::codec::oneone::OneToOneEncoder {
4242
int caculateParametersLength(const GearmanMessagePtr& msg);
4343

4444
void writeHeader(const ChannelBufferPtr& buffer, int type, int length);
45+
void writeHeaderAhead(const ChannelBufferPtr& buffer, int type, int length);
4546
void writeParameters(const ChannelBufferPtr& buffer,
4647
const std::vector<std::string>& parameters,
4748
bool withZeroPad);
49+
void writeParametersAhead(const ChannelBufferPtr& buffer,
50+
const std::vector<std::string>& parameters,
51+
bool withZeroPad);
4852
};
4953

5054
}

include/cetty/gearman/GearmanMessage.h

+3
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,9 @@ class GearmanMessage : public cetty::util::ReferenceCounter<GearmanMessage, int>
8080
SUBMIT_JOB_EPOCH = 36
8181
};
8282

83+
static const std::string REQUEST_MAGIC;
84+
static const std::string RESPONSE_MAGIC;
85+
8386
public:
8487
static GearmanMessagePtr createEchoReqMessage(const ChannelBufferPtr& payload);
8588

0 commit comments

Comments
 (0)