brpc 是百度开源的一个网络框架, 它几乎是开源的C++高性能网络框架的唯一选择。它是国内C++最优秀的开源作品之一,我相信国内大厂内部的闭源RPC网络库也参考过它

brpc的重要特性

  1. bthread,这是几乎唯一的工业级开源协程库(虽然作者不认为它是协程),在开源界几乎没有替代品
  2. 标准的rpc框架和接口, 四参数service接口
    (google::protobuf::RpcController* cntl_base,
    const Request* request,
    Response* response,
    google::protobuf::Closure* done)

终结了rpc 各种各样的自定义调用接口问题,以后开发rpc默认使用这种接口,
3. 完备的可观测性;logging和trace, span; bvar 支持的监控指标

bthread

bthread 是有栈协程, 使用用户栈来代表协程对象。它使用队列管理协程等待和切换(类似goroutine),例如加协程锁时,第一个bthread进入执行,后续bthread进入等待队列,当第一个bthread执行完后,唤醒等待队列的一个bthread进入临界区。

bthread主要依靠TaskGroup和TaskControl两个类管理,task其实就是指bthread。

TaskGroup

taskgroup 是 Thread-local group of tasks. 代表一个pthread 上的bthread协程。taskgroup负责执行一个bthread,切换当前线程的bthread等。主要方法有:

  1. start_foreground 立即运行bthread

    1
    2
    3
    4
    5
    static int start_foreground(TaskGroup** pg,
    bthread_t* __restrict tid,
    const bthread_attr_t* __restrict attr,
    void * (*fn)(void*),
    void* __restrict arg);
  2. start_background 将bthread加入到队列_rq,等待调度

    1
    2
    3
    4
    5
    template <bool REMOTE>
    int start_background(bthread_t* __restrict tid,
    const bthread_attr_t* __restrict attr,
    void * (*fn)(void*),
    void* __restrict arg);
  3. sched(TaskGroup** pg); 从pg的队列取出一个bthread,执行。如果要切换bthread,也是调用TaskGroup::sched

TaskGroup有两个任务队列, _rq和_remote_rq。_rq 是当前线程的bthread执行流加的任务, _remote_rq是其他线程向当前线程提交的任务。_rq相比_remote_rq, 始终是单线程执行,不需要加锁,性能更高。使用两个队列也是bthread对性能的优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void TaskGroup::sched(TaskGroup** pg) {
TaskGroup* g = *pg;
bthread_t next_tid = 0;
// Find next task to run, if none, switch to idle thread of the group.
#ifndef BTHREAD_FAIR_WSQ
const bool popped = g->_rq.pop(&next_tid);
#else
const bool popped = g->_rq.steal(&next_tid);
#endif
if (!popped && !g->steal_task(&next_tid)) {
// Jump to main task if there's no task to run.
next_tid = g->_main_tid;
}
sched_to(pg, next_tid);
}

bool steal_task(bthread_t* tid) {
if (_remote_rq.pop(tid)) {
return true;
}
#ifndef BTHREAD_DONT_SAVE_PARKING_STATE
_last_pl_state = _pl->get_state();
#endif
return _control->steal_task(tid, &_steal_seed, _steal_offset);
}
  1. void sched_to(TaskGroup** pg, bthread_t next_tid); 立即执行指定tid的bthread任务, 也就是协程切换
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
inline void TaskGroup::sched_to(TaskGroup** pg, bthread_t next_tid) {
TaskMeta* next_meta = address_meta(next_tid); // 直接通过tid 加地址找到taskmeta
if (next_meta->stack == NULL) {
ContextualStack* stk = get_stack(next_meta->stack_type(), task_runner);
if (stk) {
next_meta->set_stack(stk);
} else {
...
}
}
// Update now_ns only when wait_task did yield.
sched_to(pg, next_meta);
}


void TaskGroup::sched_to(TaskGroup** pg, TaskMeta* next_meta) {
TaskGroup* g = *pg;

// Save errno so that errno is bthread-specific.
const int saved_errno = errno;
void* saved_unique_user_ptr = tls_unique_user_ptr;

TaskMeta* const cur_meta = g->_cur_meta;
const int64_t now = butil::cpuwide_time_ns();
const int64_t elp_ns = now - g->_last_run_ns;
g->_last_run_ns = now;
cur_meta->stat.cputime_ns += elp_ns;


// Switch to the task
if (__builtin_expect(next_meta != cur_meta, 1)) {
g->_cur_meta = next_meta;
// Switch tls_bls
cur_meta->local_storage = tls_bls;
tls_bls = next_meta->local_storage;

// Logging must be done after switching the local storage, since the logging lib
// use bthread local storage internally, or will cause memory leak.
if ((cur_meta->attr.flags & BTHREAD_LOG_CONTEXT_SWITCH) ||
(next_meta->attr.flags & BTHREAD_LOG_CONTEXT_SWITCH)) {
LOG(INFO) << "Switch bthread: " << cur_meta->tid << " -> "
<< next_meta->tid;
}

if (cur_meta->stack != NULL) {
if (next_meta->stack != cur_meta->stack) {
CheckBthreadScheSafety();
jump_stack(cur_meta->stack, next_meta->stack);
// probably went to another group, need to assign g again.
g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);

....
}
  1. int usleep(TaskGroup** pg, uint64_t timeout_us); void yield(TaskGroup** pg);

sleep加一个定时任务,然后切换协程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
int TaskGroup::usleep(TaskGroup** pg, uint64_t timeout_us) {
if (0 == timeout_us) {
yield(pg);
return 0;
}
TaskGroup* g = *pg;
// We have to schedule timer after we switched to next bthread otherwise
// the timer may wake up(jump to) current still-running context.
SleepArgs e = { timeout_us, g->current_tid(), g->current_task(), g };
g->set_remained(_add_sleep_event, &e);
sched(pg);
g = *pg;


void TaskGroup::yield(TaskGroup** pg) {
TaskGroup* g = *pg;
ReadyToRunArgs args = { g->_cur_meta, false };
g->set_remained(ready_to_run_in_worker, &args);
sched(pg);
}

TaskControl

TaskControl管理Taskgroup

  1. 创建TaskGroup, 调用_add_group(group, tag)

TaskControl使用std::vector<TaggedGroups> _tagged_groups;管理TaskGroup, 增加TaskGroup 使用_tagged_groups[tag][ngroup] = g;

taskControl的每个tag对应若干taskgroup

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
    // Create a TaskGroup in this control.
TaskGroup* create_group(bthread_tag_t tag);

TaskGroup* TaskControl::create_group(bthread_tag_t tag) {
TaskGroup* g = new (std::nothrow) TaskGroup(this);
if (NULL == g) {
LOG(FATAL) << "Fail to new TaskGroup";
return NULL;
}
if (g->init(FLAGS_task_group_runqueue_capacity) != 0) {
LOG(ERROR) << "Fail to init TaskGroup";
delete g;
return NULL;
}
if (_add_group(g, tag) != 0) {
delete g;
return NULL;
}
return g;
}

int TaskControl::_add_group(TaskGroup* g, bthread_tag_t tag) {
std::unique_lock<butil::Mutex> mu(_modify_group_mutex);
if (_stop) {
return -1;
}
g->set_tag(tag);
g->set_pl(&_pl[tag][butil::fmix64(pthread_numeric_id()) % PARKING_LOT_NUM]);
size_t ngroup = _tagged_ngroup[tag].load(butil::memory_order_relaxed);
if (ngroup < (size_t)BTHREAD_MAX_CONCURRENCY) {
_tagged_groups[tag][ngroup] = g;
_tagged_ngroup[tag].store(ngroup + 1, butil::memory_order_release);
}
mu.unlock();
return 0;
}

typedef std::array<TaskGroup*, BTHREAD_MAX_CONCURRENCY> TaggedGroups;
std::vector<butil::atomic<size_t>> _tagged_ngroup;
std::vector<TaggedGroups> _tagged_groups;
  1. bool steal_task(bthread_t* tid, size_t* seed, size_t offset); TaskControl从当前线程的taskgroup拿一个bthread执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) {
// 当前线程的taskgroup, tls_task_group。
// tls_task_group 定义为每个线程的内部变量
auto tag = tls_task_group->tag();

const size_t ngroup = tag_ngroup(tag).load(butil::memory_order_acquire/*1*/);
if (0 == ngroup) {
return false;
}

// NOTE: Don't return inside `for' iteration since we need to update |seed|
bool stolen = false;
size_t s = *seed;
auto& groups = tag_group(tag);
for (size_t i = 0; i < ngroup; ++i, s += offset) {
TaskGroup* g = groups[s % ngroup];
// g is possibly NULL because of concurrent _destroy_group
if (g) {
if (g->_rq.steal(tid)) {
stolen = true;
break;
}
if (g->_remote_rq.pop(tid)) {
stolen = true;
break;
}
}
}
*seed = s;
return stolen;
}
  1. TaskControl::signal_task(int num_task, bthread_tag_t tag) 唤醒tag对应的taskgroup

值得注意的是, signal 借助ParkingLot 类, 通过futex_wait_private和futex_wake_private封装了一种信号量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void TaskControl::signal_task(int num_task, bthread_tag_t tag) {
if (num_task <= 0) {
return;
}
if (num_task > 2) {
num_task = 2;
}
auto& pl = tag_pl(tag);
int start_index = butil::fmix64(pthread_numeric_id()) % PARKING_LOT_NUM;
num_task -= pl[start_index].signal(1);
if (num_task > 0) {
for (int i = 1; i < PARKING_LOT_NUM && num_task > 0; ++i) {
if (++start_index >= PARKING_LOT_NUM) {
start_index = 0;
}
num_task -= pl[start_index].signal(1);
}
}
}
  1. TaskGroup* choose_one_group(bthread_tag_t tag); 随机选择一个tag对应的TaskGroup
1
2
3
4
5
6
7
8
9
10
TaskGroup* TaskControl::choose_one_group(bthread_tag_t tag) {
CHECK(tag >= BTHREAD_TAG_DEFAULT && tag < FLAGS_task_group_ntags);
auto& groups = tag_group(tag);
const auto ngroup = tag_ngroup(tag).load(butil::memory_order_acquire);
if (ngroup != 0) {
return groups[butil::fast_rand_less_than(ngroup)];
}
CHECK(false) << "Impossible: ngroup is 0";
return NULL;
}

bthread协程的实现原理

bthread是有栈协程,在创建协程时,会在线程空间创建协程栈,并装载协程函数;运行协程是运行协程栈的函数,类似线程栈的函数入栈出栈;切换协程是保存协程上下文到协程栈;退出协程则会销毁协程栈。

协程实现包括bthread_make_fcontext 创建协程栈、bthread_jump_fcontext 切换协程栈 函数,在context.cpp文件里。

协程实现原理还可以参考:https://sf-zhou.github.io/brpc/brpc_01_bthread.html

总结

bthread 提供了工业级的协程实现,让brpc的函数可以在bthread中执行。bthread提供sleep, yield, join等常用调用。

taskcontrol是管理和调度bthread的类, 还提供steal_task方法从某个线程中拿取bthread,这个接口可以用来优化线程load的均衡。

用户可以通过以下步骤管理协程

  1. taskcontrol.init(int nconcurrency) 创建指定数量的线程
  2. taskcontrol.create_group(tag) 创建taskgroup, taskgroup会由固定的一个线程负责, 返回taskgroup
  3. taskgroup.start_background, 在taskgroup启动一个bthread任务并执行, 可以实现《在指定线程创建协程》这个灵活操作

brpc

brpc 是一个网络库,下层接socket、event和网络包,上层接http/proto网络协议和service。同时提供name service根据域名获取下游机器组和lb 负载均衡选择下游机器。

channel 和客户端

channel通道,是网络中相当常见的概念。在brpc中,专门指客户端发起的通道。A Channel represents a communication line to one server or multiple servers which can be used to call that Server’s services. Servers may be running on another machines. Normally, you should not call a Channel directly, but instead construct a stub Service wrapping it.

channel 主要是两个接口

channel::Init 创建shannel,发起链接

  1. int Init(const char* server_addr_and_port, const ChannelOptions* options); 初始化channel,也就是创建发起一个连接
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
int Channel::Init(const char* server_addr, int port,
const ChannelOptions* options) {
GlobalInitializeOrDie();
butil::EndPoint point;
const AdaptiveProtocolType& ptype = (options ? options->protocol : _options.protocol);
const Protocol* protocol = FindProtocol(ptype);
if (protocol == NULL || !protocol->support_client()) {
LOG(ERROR) << "Channel does not support the protocol";
return -1;
}
if (protocol->parse_server_address != NULL) {
if (!protocol->parse_server_address(&point, server_addr)) {
LOG(ERROR) << "Fail to parse address=`" << server_addr << '\'';
return -1;
}
point.port = port;
} else {
if (str2endpoint(server_addr, port, &point) != 0 &&
hostname2endpoint(server_addr, port, &point) != 0) {
return -1;
}
}
return InitSingle(point, server_addr, options, port);
}

Channel::InitSingle 函数

a. GlobalInitializeOrDie();
b. InitChannelOptions(options)
c. CreateSocketSSLContext(_options, &ssl_ctx)
d. SocketMapInsert(SocketMapKey(server_addr_and_port, sig),

显然核心函数在SocketMapInsert, SocketMap::Insert(const SocketMapKey& key, SocketId* id,

a. SingleConnection* sc = _map.seek(key); 如果链接已经有了,返回
b. _options.socket_creator->CreateSocket(opt, &tmp_id) 创建链接
c. _map[key] = new_sc; 链接加入到map

Socket::OnCreated

a. 配置成员变量,如_remote_side,_local_side 等
b. DoConnect(options.connect_abstime, NULL, NULL); 创建和发起socket链接

1
2
3
4
5
6
7
8
9
int Socket::DoConnect(const timespec* abstime,
int (*on_connect)(int, int, void*), void* data) {
if (_conn) {
return _conn->Connect(this, abstime, on_connect, data);
} else {
// 会重新创建socket,然后::connect
return Connect(abstime, on_connect, data);
}
}

stub.method 通过channel 向服务端发送请求

客户端调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
    // 初始化channel
brpc::Channel channel;

// Initialize the channel, NULL means using default options.
brpc::ChannelOptions options;
options.protocol = FLAGS_protocol;
options.connection_type = FLAGS_connection_type;
options.timeout_ms = FLAGS_timeout_ms/*milliseconds*/;
options.max_retry = FLAGS_max_retry;
if (channel.Init(FLAGS_server.c_str(), FLAGS_load_balancer.c_str(), &options) != 0) {
LOG(ERROR) << "Fail to initialize channel";
return -1;
}

example::EchoService_Stub stub(&channel);

example::EchoRequest request;
example::EchoResponse response;
brpc::Controller cntl;

request.set_message(g_request);
cntl.set_log_id(log_id++); // set by user

cntl.request_attachment().append(g_attachment);

stub.Echo(&cntl, &request, &response, NULL);

// 通过channel->CallMethod 发送数据
void EchoService_Stub::Echo(::PROTOBUF_NAMESPACE_ID::RpcController* controller,
const ::example::EchoRequest* request,
::example::EchoResponse* response,
::google::protobuf::Closure* done) {
channel_->CallMethod(descriptor()->method(0),
controller, request, response, done);
}

Channel::CallMethod

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
void Channel::CallMethod(const google::protobuf::MethodDescriptor* method,
google::protobuf::RpcController* controller_base,
const google::protobuf::Message* request,
google::protobuf::Message* response,
google::protobuf::Closure* done) {
const int64_t start_send_real_us = butil::gettimeofday_us();
Controller* cntl = static_cast<Controller*>(controller_base);
cntl->OnRPCBegin(start_send_real_us);
...
cntl->_response = response;
cntl->_done = done;
cntl->_pack_request = _pack_request;
cntl->_method = method;
cntl->_auth = _options.auth;

if (SingleServer()) {
cntl->_single_server_id = _server_id;
cntl->_remote_side = _server_address;
}

// Share the lb with controller.
cntl->_lb = _lb;

_serialize_request(&cntl->_request_buf, cntl, request);


cntl->IssueRPC(start_send_real_us);
if (done == NULL) {
// done如果是nullptr,同步等待
Join(correlation_id);
if (cntl->_span) {
cntl->SubmitSpan();
}
cntl->OnRPCEnd(butil::gettimeofday_us());
}
}

cntl->IssueRPC

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
void Controller::IssueRPC(int64_t start_realtime_us) {
_current_call.begin_time_us = start_realtime_us;

// Pick a target server for sending RPC
_current_call.need_feedback = false;
_current_call.enable_circuit_breaker = has_enabled_circuit_breaker();
SocketUniquePtr tmp_sock;
if (SingleServer()) {
const int rc = Socket::Address(_single_server_id, &tmp_sock);

_current_call.peer_id = _single_server_id;
} else {
// 使用lb选择下游服务器
LoadBalancer::SelectIn sel_in =
{ start_realtime_us, true,
has_request_code(), _request_code, _accessed };
LoadBalancer::SelectOut sel_out(&tmp_sock);
const int rc = _lb->SelectServer(sel_in, &sel_out);
if (rc != 0) {
std::ostringstream os;
DescribeOptions opt;
opt.verbose = false;
_lb->Describe(os, opt);
SetFailed(rc, "Fail to select server from %s", os.str().c_str());
return HandleSendFailed();
}
_current_call.need_feedback = sel_out.need_feedback;
_current_call.peer_id = tmp_sock->id();

_remote_side = tmp_sock->remote_side();
}

// Make request
butil::IOBuf packet;
SocketMessage* user_packet = NULL;
_pack_request(&packet, &user_packet, cid.value, _method, this,
_request_buf, using_auth);
// TODO: PackRequest may accept SocketMessagePtr<>?
SocketMessagePtr<> user_packet_guard(user_packet);
Socket::WriteOptions wopt;
wopt.id_wait = cid;
wopt.abstime = pabstime;
wopt.pipelined_count = _pipelined_count;
wopt.auth_flags = _auth_flags;
wopt.ignore_eovercrowded = has_flag(FLAGS_IGNORE_EOVERCROWDED);
wopt.write_in_background = write_to_socket_in_background();
int rc;
size_t packet_size = 0;
// 写数据到socket
if (user_packet_guard) {
if (span) {
packet_size = user_packet_guard->EstimatedByteSize();
}
rc = _current_call.sending_sock->Write(user_packet_guard, &wopt);
} else {
packet_size = packet.size();
rc = _current_call.sending_sock->Write(&packet, &wopt);
}

if (using_auth) {
_current_call.sending_sock->SetAuthentication(rc);
}
}

Acceptor 和EventDispatcher

brpc 的rpc 客户端是通过channel 创建链接和向链接发送请求。类似channel的两个特点, 服务端的基础就是从socket accpet链接和接收数据。完成这个的分别是acceptor和EventDispatcher

Acceptor 接受链接

Acceptor 是server的成员变量,在server.Start()里调用Acceptor::StartAccept

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Acceptor* _am;
Acceptor* _internal_am;


// listen socket
_listen_addr = endpoint;
for (int port = port_range.min_port; port <= port_range.max_port; ++port) {
_listen_addr.port = port;
butil::fd_guard sockfd(tcp_listen(_listen_addr));
if (_listen_addr.port == 0) {
// port=0 makes kernel dynamically select a port from
// https://en.wikipedia.org/wiki/Ephemeral_port
_listen_addr.port = get_port_from_fd(sockfd);
...
// Pass ownership of `sockfd' to `_am'
if (_am->StartAccept(sockfd, _options.idle_timeout_sec,
_default_ssl_ctx,
_options.force_ssl) != 0) {
LOG(ERROR) << "Fail to start acceptor";
return -1;
}

Acceptor::StartAccept 会把 sockfd 注册可读事件给EventDispatcher,回调函数是OnNewConnections,也就是创建新链接。

显然EventDispatcher 是一个epoll模型的程序, 不停的触发网络事件,并发送给对应注册事件的线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
void Acceptor::OnNewConnections(Socket* acception) {
int progress = Socket::PROGRESS_INIT;
do {
OnNewConnectionsUntilEAGAIN(acception);
if (acception->Failed()) {
return;
}
} while (acception->MoreReadEvents(&progress));
}

void Acceptor::OnNewConnectionsUntilEAGAIN(Socket* acception) {
while (1) {
struct sockaddr_storage in_addr;
bzero(&in_addr, sizeof(in_addr));
socklen_t in_len = sizeof(in_addr);
butil::fd_guard in_fd(accept(acception->fd(), (sockaddr*)&in_addr, &in_len));
if (in_fd < 0) {
continue;
}

Acceptor* am = dynamic_cast<Acceptor*>(acception->user());
if (NULL == am) {
return;
}

SocketId socket_id;
SocketOptions options;
options.keytable_pool = am->_keytable_pool;
options.fd = in_fd;
butil::sockaddr2endpoint(&in_addr, in_len, &options.remote_side);
options.user = acception->user();
options.force_ssl = am->_force_ssl;
options.initial_ssl_ctx = am->_ssl_ctx;
options.on_edge_triggered_events = InputMessenger::OnNewMessages;
}
options.use_rdma = am->_use_rdma;
options.bthread_tag = am->_bthread_tag;
if (Socket::Create(options, &socket_id) != 0) {
LOG(ERROR) << "Fail to create Socket";
continue;
}

SocketUniquePtr sock;
if (Socket::AddressFailedAsWell(socket_id, &sock) >= 0) {
bool is_running = true;
{
BAIDU_SCOPED_LOCK(am->_map_mutex);
is_running = (am->status() == RUNNING);
am->_socket_map.insert(socket_id, ConnectStatistics());
}
}

EventDispatcher

EventDispatcher 可以配置,一般每个tag tasakgroup配置一个dispather,也就是一个线程一个, 以bthread的形式运行

循环执行epoll_wait -> CallInputEventCallback。 epoll_wait不会阻塞整个线程? epoll_wait时线程阻塞,但有事件到来继续执行时,该线程会创建多个bthread处理。这里需要详细分析brpc 线程切换的行为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
void InitializeGlobalDispatchers() {
g_edisp = new EventDispatcher[FLAGS_task_group_ntags * FLAGS_event_dispatcher_num];
for (int i = 0; i < FLAGS_task_group_ntags; ++i) {
for (int j = 0; j < FLAGS_event_dispatcher_num; ++j) {
bthread_attr_t attr =
FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL;
attr.tag = (BTHREAD_TAG_DEFAULT + i) % FLAGS_task_group_ntags;
// dispatcher.Start
CHECK_EQ(0, g_edisp[i * FLAGS_event_dispatcher_num + j].Start(&attr));
}
}


int EventDispatcher::Start(const bthread_attr_t* consumer_thread_attr) {
if (_event_dispatcher_fd < 0) {
LOG(FATAL) << "epoll was not created";
return -1;
}
if (_tid != 0) {
LOG(FATAL) << "Already started this dispatcher(" << this
<< ") in bthread=" << _tid;
return -1;
}
//
int rc = bthread_start_background(&_tid, &epoll_thread_attr, RunThis, this);
if (rc) {
LOG(FATAL) << "Fail to create epoll thread: " << berror(rc);
return -1;
}
return 0;
}

// bthread运行EventDispatcher::Run
void EventDispatcher::Run() {
while (!_stop) {
epoll_event e[32];
const int n = epoll_wait(_event_dispatcher_fd, e, ARRAY_SIZE(e), -1);
if (_stop) {
break;
}
if (n < 0) {
if (EINTR == errno) {
// We've checked _stop, no wake-up will be missed.
continue;
}
PLOG(FATAL) << "Fail to epoll_wait epfd=" << _event_dispatcher_fd;
break;
}
for (int i = 0; i < n; ++i) {
if (e[i].events & (EPOLLIN | EPOLLERR | EPOLLHUP)
) {
// We don't care about the return value.
CallInputEventCallback(e[i].data.u64, e[i].events, _thread_attr);
}
}
for (int i = 0; i < n; ++i) {
if (e[i].events & (EPOLLOUT | EPOLLERR | EPOLLHUP)) {
// We don't care about the return value.
CallOutputEventCallback(e[i].data.u64, e[i].events, _thread_attr);
}
}
}
}

protocol service

上层应用层协议

EventDispatcher的CallOutputEventCallback 首先调用InputMessenger::OnNewMessages 从socket读数据,然后调用InputMessenger::CutInputMessage 确认协议类型。

确定协议类型的办法

  1. 遍历_handlers
  2. 执行 _handlers[cur_index].parse(&m->_read_buf, m, read_eof, _handlers[cur_index].arg);
  3. 如果成功,找到Index;如果失败,继续下一轮尝试
    这个一个个的解析尝试的办法,肯定会影响性能把

handle类型就是协议类型,确认了协议类型就调用对应的handle解析协议了

1
2
3
4
5
6
7
8
9
10
11
Protocol http_protocol = { ParseHttpMessage,
SerializeHttpRequest, PackHttpRequest,
ProcessHttpRequest, ProcessHttpResponse,
VerifyHttpRequest, ParseHttpServerAddress,
GetHttpMethodName,
CONNECTION_TYPE_POOLED_AND_SHORT,
"http" };
if (RegisterProtocol(PROTOCOL_HTTP, http_protocol) != 0) {
exit(1);
}

以protocol rpc service为例,数据从socket读取到msg后, 存放在InputMessageBase

1
2
3
4
5
6
7
8
9
// 任何网络包都可以解析成header+data两部分
struct BAIDU_CACHELINE_ALIGNMENT MostCommonMessage : public InputMessageBase {
butil::IOBuf meta;
butil::IOBuf payload;
PipelinedInfo pi;

inline static MostCommonMessage* Get() {
return butil::get_object<MostCommonMessage>();
}
  1. ParsePbFromIOBuf(&meta, msg->meta)

  2. 将msg->payload解析到messages->Request()msg->payload.cutn(&req_buf, body_without_attachment_size); ParseFromCompressedData(req_buf, messages->Request(), req_cmp_type)

  3. std::unique_ptr<Controller> cntl(new (std::nothrow) Controller);

  4. 设置google::protobuf::Closure* done = ::brpc::NewCallback<
    int64_t, Controller*, RpcPBMessages*,
    const Server*, MethodStatus*, int64_t>(&SendRpcResponse,

  5. 获得service
    const Server::MethodProperty* mp =
    server_accessor.FindMethodPropertyByFullName(
    svc_name, request_meta.method_name());
    svc = mp->service;
    拿到service对象

  6. svc->CallMethod(method, cntl.release(),
    messages->Request(),
    messages->Response(), done);

CallMethod具体的实现在pb.h中

  1. 执行完service函数, 最后SendRpcResponse(meta.correlation_id(),
    cntl.release(), messages,
    server, method_status,
    msg->received_us());

TODO

brpc的线程模型和切换

brpc的使用和性能资源分析

brpc和butil实现上的优秀、值得借鉴的地方