rpc_engine.cc 3.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "rpc_engine.h"
  19. #include "rpc_connection.h"
  20. #include "common/util.h"
  21. #include <openssl/rand.h>
  22. #include <sstream>
  23. #include <future>
  24. namespace hdfs {
  25. RpcEngine::RpcEngine(::asio::io_service *io_service,
  26. const std::string &client_name, const char *protocol_name,
  27. int protocol_version)
  28. : io_service_(io_service), client_name_(client_name),
  29. protocol_name_(protocol_name), protocol_version_(protocol_version),
  30. call_id_(0)
  31. , conn_(new RpcConnectionImpl<::asio::ip::tcp::socket>(this))
  32. {}
  33. Status
  34. RpcEngine::Connect(const std::vector<::asio::ip::tcp::endpoint> &servers) {
  35. using ::asio::ip::tcp;
  36. auto stat = std::make_shared<std::promise<Status>>();
  37. std::future<Status> future(stat->get_future());
  38. conn_->Connect(servers, [this, stat](const Status &status) {
  39. if (!status.ok()) {
  40. stat->set_value(status);
  41. return;
  42. }
  43. conn_->Handshake(
  44. [this, stat](const Status &status) { stat->set_value(status); });
  45. });
  46. return future.get();
  47. }
  48. void RpcEngine::Start() { conn_->Start(); }
  49. void RpcEngine::Shutdown() {
  50. io_service_->post([this]() { conn_->Shutdown(); });
  51. }
  52. void RpcEngine::AsyncRpc(
  53. const std::string &method_name, const ::google::protobuf::MessageLite *req,
  54. const std::shared_ptr<::google::protobuf::MessageLite> &resp,
  55. std::function<void(const Status &)> &&handler) {
  56. conn_->AsyncRpc(method_name, req, resp, std::move(handler));
  57. }
  58. Status
  59. RpcEngine::Rpc(const std::string &method_name,
  60. const ::google::protobuf::MessageLite *req,
  61. const std::shared_ptr<::google::protobuf::MessageLite> &resp) {
  62. auto stat = std::make_shared<std::promise<Status>>();
  63. std::future<Status> future(stat->get_future());
  64. AsyncRpc(method_name, req, resp,
  65. [stat](const Status &status) { stat->set_value(status); });
  66. return future.get();
  67. }
  68. Status RpcEngine::RawRpc(const std::string &method_name, const std::string &req,
  69. std::shared_ptr<std::string> resp) {
  70. auto stat = std::make_shared<std::promise<Status>>();
  71. std::future<Status> future(stat->get_future());
  72. conn_->AsyncRawRpc(method_name, req, resp,
  73. [stat](const Status &status) { stat->set_value(status); });
  74. return future.get();
  75. }
  76. std::string RpcEngine::GetRandomClientName() {
  77. unsigned char buf[6] = {
  78. 0,
  79. };
  80. RAND_pseudo_bytes(buf, sizeof(buf));
  81. std::stringstream ss;
  82. ss << "libhdfs++_"
  83. << Base64Encode(std::string(reinterpret_cast<char *>(buf), sizeof(buf)));
  84. return ss.str();
  85. }
  86. }