voidTaskGroup::sched(TaskGroup** pg){ TaskGroup* g = *pg; bthread_t next_tid = 0; // Find next task to run, if none, switch to idle thread of the group. #ifndef BTHREAD_FAIR_WSQ constbool popped = g->_rq.pop(&next_tid); #else constbool popped = g->_rq.steal(&next_tid); #endif if (!popped && !g->steal_task(&next_tid)) { // Jump to main task if there's no task to run. next_tid = g->_main_tid; } sched_to(pg, next_tid); }
// Switch to the task if (__builtin_expect(next_meta != cur_meta, 1)) { g->_cur_meta = next_meta; // Switch tls_bls cur_meta->local_storage = tls_bls; tls_bls = next_meta->local_storage;
// Logging must be done after switching the local storage, since the logging lib // use bthread local storage internally, or will cause memory leak. if ((cur_meta->attr.flags & BTHREAD_LOG_CONTEXT_SWITCH) || (next_meta->attr.flags & BTHREAD_LOG_CONTEXT_SWITCH)) { LOG(INFO) << "Switch bthread: " << cur_meta->tid << " -> " << next_meta->tid; }
if (cur_meta->stack != NULL) { if (next_meta->stack != cur_meta->stack) { CheckBthreadScheSafety(); jump_stack(cur_meta->stack, next_meta->stack); // probably went to another group, need to assign g again. g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
.... }
int usleep(TaskGroup** pg, uint64_t timeout_us); void yield(TaskGroup** pg);
intTaskGroup::usleep(TaskGroup** pg, uint64_t timeout_us){ if (0 == timeout_us) { yield(pg); return0; } TaskGroup* g = *pg; // We have to schedule timer after we switched to next bthread otherwise // the timer may wake up(jump to) current still-running context. SleepArgs e = { timeout_us, g->current_tid(), g->current_task(), g }; g->set_remained(_add_sleep_event, &e); sched(pg); g = *pg;
// NOTE: Don't return inside `for' iteration since we need to update |seed| bool stolen = false; size_t s = *seed; auto& groups = tag_group(tag); for (size_t i = 0; i < ngroup; ++i, s += offset) { TaskGroup* g = groups[s % ngroup]; // g is possibly NULL because of concurrent _destroy_group if (g) { if (g->_rq.steal(tid)) { stolen = true; break; } if (g->_remote_rq.pop(tid)) { stolen = true; break; } } } *seed = s; return stolen; }
channel通道,是网络中相当常见的概念。在brpc中,专门指客户端发起的通道。A Channel represents a communication line to one server or multiple servers which can be used to call that Server’s services. Servers may be running on another machines. Normally, you should not call a Channel directly, but instead construct a stub Service wrapping it.
channel 主要是两个接口
channel::Init 创建shannel,发起链接
int Init(const char* server_addr_and_port, const ChannelOptions* options); 初始化channel,也就是创建发起一个连接
intChannel::Init(constchar* server_addr, int port, const ChannelOptions* options){ GlobalInitializeOrDie(); butil::EndPoint point; const AdaptiveProtocolType& ptype = (options ? options->protocol : _options.protocol); const Protocol* protocol = FindProtocol(ptype); if (protocol == NULL || !protocol->support_client()) { LOG(ERROR) << "Channel does not support the protocol"; return-1; } if (protocol->parse_server_address != NULL) { if (!protocol->parse_server_address(&point, server_addr)) { LOG(ERROR) << "Fail to parse address=`" << server_addr << '\''; return-1; } point.port = port; } else { if (str2endpoint(server_addr, port, &point) != 0 && hostname2endpoint(server_addr, port, &point) != 0) { return-1; } } returnInitSingle(point, server_addr, options, port); }
Channel::InitSingle 函数
a. GlobalInitializeOrDie(); b. InitChannelOptions(options) c. CreateSocketSSLContext(_options, &ssl_ctx) d. SocketMapInsert(SocketMapKey(server_addr_and_port, sig),
voidAcceptor::OnNewConnections(Socket* acception){ int progress = Socket::PROGRESS_INIT; do { OnNewConnectionsUntilEAGAIN(acception); if (acception->Failed()) { return; } } while (acception->MoreReadEvents(&progress)); }
intEventDispatcher::Start(constbthread_attr_t* consumer_thread_attr){ if (_event_dispatcher_fd < 0) { LOG(FATAL) << "epoll was not created"; return-1; } if (_tid != 0) { LOG(FATAL) << "Already started this dispatcher(" << this << ") in bthread=" << _tid; return-1; } // int rc = bthread_start_background(&_tid, &epoll_thread_attr, RunThis, this); if (rc) { LOG(FATAL) << "Fail to create epoll thread: " << berror(rc); return-1; } return0; }
// bthread运行EventDispatcher::Run voidEventDispatcher::Run(){ while (!_stop) { epoll_event e[32]; constint n = epoll_wait(_event_dispatcher_fd, e, ARRAY_SIZE(e), -1); if (_stop) { break; } if (n < 0) { if (EINTR == errno) { // We've checked _stop, no wake-up will be missed. continue; } PLOG(FATAL) << "Fail to epoll_wait epfd=" << _event_dispatcher_fd; break; } for (int i = 0; i < n; ++i) { if (e[i].events & (EPOLLIN | EPOLLERR | EPOLLHUP) ) { // We don't care about the return value. CallInputEventCallback(e[i].data.u64, e[i].events, _thread_attr); } } for (int i = 0; i < n; ++i) { if (e[i].events & (EPOLLOUT | EPOLLERR | EPOLLHUP)) { // We don't care about the return value. CallOutputEventCallback(e[i].data.u64, e[i].events, _thread_attr); } } } }