/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "rpc_engine.h" #include "RpcHeader.pb.h" #include "ProtobufRpcEngine.pb.h" #include "IpcConnectionContext.pb.h" #include "common/util.h" #include #include #include namespace hdfs { namespace pb = ::google::protobuf; namespace pbio = ::google::protobuf::io; using namespace ::hadoop::common; using namespace ::std::placeholders; static void ConstructPacket(std::string *res, std::initializer_list headers, const std::string *request) { int len = 0; std::for_each( headers.begin(), headers.end(), [&len](const pb::MessageLite *v) { len += DelimitedPBMessageSize(v); }); if (request) { len += pbio::CodedOutputStream::VarintSize32(request->size()) + request->size(); } int net_len = htonl(len); res->reserve(res->size() + sizeof(net_len) + len); pbio::StringOutputStream ss(res); pbio::CodedOutputStream os(&ss); os.WriteRaw(reinterpret_cast(&net_len), sizeof(net_len)); uint8_t *buf = os.GetDirectBufferForNBytesAndAdvance(len); assert(buf && "Cannot allocate memory"); std::for_each( headers.begin(), headers.end(), [&buf](const pb::MessageLite *v) { buf = pbio::CodedOutputStream::WriteVarint32ToArray(v->ByteSize(), buf); buf = v->SerializeWithCachedSizesToArray(buf); }); if (request) { buf = pbio::CodedOutputStream::WriteVarint32ToArray(request->size(), buf); buf = os.WriteStringToArray(*request, buf); } } static void SetRequestHeader(RpcEngine *engine, int call_id, const std::string &method_name, RpcRequestHeaderProto *rpc_header, RequestHeaderProto *req_header) { rpc_header->set_rpckind(RPC_PROTOCOL_BUFFER); rpc_header->set_rpcop(RpcRequestHeaderProto::RPC_FINAL_PACKET); rpc_header->set_callid(call_id); rpc_header->set_clientid(engine->client_name()); req_header->set_methodname(method_name); req_header->set_declaringclassprotocolname(engine->protocol_name()); req_header->set_clientprotocolversion(engine->protocol_version()); } RpcConnection::~RpcConnection() {} RpcConnection::Request::Request(RpcConnection *parent, const std::string &method_name, const std::string &request, Handler &&handler) : call_id_(parent->engine_->NextCallId()), timer_(parent->io_service()), handler_(std::move(handler)) { RpcRequestHeaderProto rpc_header; RequestHeaderProto req_header; SetRequestHeader(parent->engine_, call_id_, method_name, &rpc_header, &req_header); ConstructPacket(&payload_, {&rpc_header, &req_header}, &request); } RpcConnection::Request::Request(RpcConnection *parent, const std::string &method_name, const pb::MessageLite *request, Handler &&handler) : call_id_(parent->engine_->NextCallId()), timer_(parent->io_service()), handler_(std::move(handler)) { RpcRequestHeaderProto rpc_header; RequestHeaderProto req_header; SetRequestHeader(parent->engine_, call_id_, method_name, &rpc_header, &req_header); ConstructPacket(&payload_, {&rpc_header, &req_header, request}, nullptr); } void RpcConnection::Request::OnResponseArrived(pbio::CodedInputStream *is, const Status &status) { handler_(is, status); } RpcConnection::RpcConnection(RpcEngine *engine) : engine_(engine), resp_state_(kReadLength), resp_length_(0) {} ::asio::io_service &RpcConnection::io_service() { return engine_->io_service(); } void RpcConnection::Start() { io_service().post(std::bind(&RpcConnection::OnRecvCompleted, this, ::asio::error_code(), 0)); } void RpcConnection::FlushPendingRequests() { io_service().post([this]() { if (!request_over_the_wire_) { OnSendCompleted(::asio::error_code(), 0); } }); } void RpcConnection::HandleRpcResponse(const std::vector &data) { /* assumed to be called from a context that has already acquired the * engine_state_lock */ pbio::ArrayInputStream ar(&data[0], data.size()); pbio::CodedInputStream in(&ar); in.PushLimit(data.size()); RpcResponseHeaderProto h; ReadDelimitedPBMessage(&in, &h); auto it = requests_on_fly_.find(h.callid()); if (it == requests_on_fly_.end()) { // TODO: out of line RPC request assert(false && "Out of line request with unknown call id"); } auto req = it->second; requests_on_fly_.erase(it); Status stat; if (h.has_exceptionclassname()) { stat = Status::Exception(h.exceptionclassname().c_str(), h.errormsg().c_str()); } req->OnResponseArrived(&in, stat); } std::shared_ptr RpcConnection::PrepareHandshakePacket() { static const char kHandshakeHeader[] = {'h', 'r', 'p', 'c', RpcEngine::kRpcVersion, 0, 0}; auto res = std::make_shared(kHandshakeHeader, sizeof(kHandshakeHeader)); RpcRequestHeaderProto h; h.set_rpckind(RPC_PROTOCOL_BUFFER); h.set_rpcop(RpcRequestHeaderProto::RPC_FINAL_PACKET); h.set_callid(RpcEngine::kCallIdConnectionContext); h.set_clientid(engine_->client_name()); IpcConnectionContextProto handshake; handshake.set_protocol(engine_->protocol_name()); ConstructPacket(res.get(), {&h, &handshake}, nullptr); return res; } void RpcConnection::AsyncRpc( const std::string &method_name, const ::google::protobuf::MessageLite *req, std::shared_ptr<::google::protobuf::MessageLite> resp, Callback &&handler) { std::lock_guard state_lock(engine_state_lock_); auto wrapped_handler = [resp, handler](pbio::CodedInputStream *is, const Status &status) { if (status.ok()) { ReadDelimitedPBMessage(is, resp.get()); } handler(status); }; auto r = std::make_shared(this, method_name, req, std::move(wrapped_handler)); pending_requests_.push_back(r); FlushPendingRequests(); } void RpcConnection::AsyncRawRpc(const std::string &method_name, const std::string &req, std::shared_ptr resp, Callback &&handler) { std::lock_guard state_lock(engine_state_lock_); auto wrapped_handler = [this, resp, handler](pbio::CodedInputStream *is, const Status &status) { if (status.ok()) { uint32_t size = 0; is->ReadVarint32(&size); auto limit = is->PushLimit(size); is->ReadString(resp.get(), limit); is->PopLimit(limit); } handler(status); }; auto r = std::make_shared(this, method_name, req, std::move(wrapped_handler)); pending_requests_.push_back(r); FlushPendingRequests(); } }