rpc_engine.h 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #ifndef LIB_RPC_RPC_ENGINE_H_
  19. #define LIB_RPC_RPC_ENGINE_H_
  20. #include "libhdfs++/status.h"
  21. #include <google/protobuf/message_lite.h>
  22. #include <asio/ip/tcp.hpp>
  23. #include <asio/deadline_timer.hpp>
  24. #include <atomic>
  25. #include <memory>
  26. #include <unordered_map>
  27. #include <vector>
  28. #include <mutex>
  29. namespace hdfs {
  30. class RpcEngine;
  31. class RpcConnection {
  32. public:
  33. typedef std::function<void(const Status &)> Callback;
  34. virtual ~RpcConnection();
  35. RpcConnection(RpcEngine *engine);
  36. virtual void Connect(const std::vector<::asio::ip::tcp::endpoint> &server,
  37. Callback &&handler) = 0;
  38. virtual void Handshake(Callback &&handler) = 0;
  39. virtual void Shutdown() = 0;
  40. void Start();
  41. void AsyncRpc(const std::string &method_name,
  42. const ::google::protobuf::MessageLite *req,
  43. std::shared_ptr<::google::protobuf::MessageLite> resp,
  44. Callback &&handler);
  45. void AsyncRawRpc(const std::string &method_name, const std::string &request,
  46. std::shared_ptr<std::string> resp, Callback &&handler);
  47. protected:
  48. RpcEngine *const engine_;
  49. virtual void OnSendCompleted(const ::asio::error_code &ec,
  50. size_t transferred) = 0;
  51. virtual void OnRecvCompleted(const ::asio::error_code &ec,
  52. size_t transferred) = 0;
  53. ::asio::io_service &io_service();
  54. std::shared_ptr<std::string> PrepareHandshakePacket();
  55. static std::string
  56. SerializeRpcRequest(const std::string &method_name,
  57. const ::google::protobuf::MessageLite *req);
  58. void HandleRpcResponse(const std::vector<char> &data);
  59. void FlushPendingRequests();
  60. enum ResponseState {
  61. kReadLength,
  62. kReadContent,
  63. kParseResponse,
  64. } resp_state_;
  65. unsigned resp_length_;
  66. std::vector<char> resp_data_;
  67. class Request {
  68. public:
  69. typedef std::function<void(::google::protobuf::io::CodedInputStream *is,
  70. const Status &status)> Handler;
  71. Request(RpcConnection *parent, const std::string &method_name,
  72. const std::string &request, Handler &&callback);
  73. Request(RpcConnection *parent, const std::string &method_name,
  74. const ::google::protobuf::MessageLite *request, Handler &&callback);
  75. int call_id() const { return call_id_; }
  76. ::asio::deadline_timer &timer() { return timer_; }
  77. const std::string &payload() const { return payload_; }
  78. void OnResponseArrived(::google::protobuf::io::CodedInputStream *is,
  79. const Status &status);
  80. private:
  81. const int call_id_;
  82. ::asio::deadline_timer timer_;
  83. std::string payload_;
  84. Handler handler_;
  85. };
  86. // The request being sent over the wire
  87. std::shared_ptr<Request> request_over_the_wire_;
  88. // Requests to be sent over the wire
  89. std::vector<std::shared_ptr<Request>> pending_requests_;
  90. // Requests that are waiting for responses
  91. std::unordered_map<int, std::shared_ptr<Request>> requests_on_fly_;
  92. // Lock for mutable parts of this class that need to be thread safe
  93. std::mutex engine_state_lock_;
  94. };
  95. class RpcEngine {
  96. public:
  97. enum { kRpcVersion = 9 };
  98. enum {
  99. kCallIdAuthorizationFailed = -1,
  100. kCallIdInvalid = -2,
  101. kCallIdConnectionContext = -3,
  102. kCallIdPing = -4
  103. };
  104. RpcEngine(::asio::io_service *io_service, const std::string &client_name,
  105. const char *protocol_name, int protocol_version);
  106. void AsyncRpc(const std::string &method_name,
  107. const ::google::protobuf::MessageLite *req,
  108. const std::shared_ptr<::google::protobuf::MessageLite> &resp,
  109. std::function<void(const Status &)> &&handler);
  110. Status Rpc(const std::string &method_name,
  111. const ::google::protobuf::MessageLite *req,
  112. const std::shared_ptr<::google::protobuf::MessageLite> &resp);
  113. /**
  114. * Send raw bytes as RPC payload. This is intended to be used in JNI
  115. * bindings only.
  116. **/
  117. Status RawRpc(const std::string &method_name, const std::string &req,
  118. std::shared_ptr<std::string> resp);
  119. Status Connect(const std::vector<::asio::ip::tcp::endpoint> &server);
  120. void Start();
  121. void Shutdown();
  122. int NextCallId() { return ++call_id_; }
  123. const std::string &client_name() const { return client_name_; }
  124. const std::string &protocol_name() const { return protocol_name_; }
  125. int protocol_version() const { return protocol_version_; }
  126. ::asio::io_service &io_service() { return *io_service_; }
  127. static std::string GetRandomClientName();
  128. private:
  129. ::asio::io_service *io_service_;
  130. const std::string client_name_;
  131. const std::string protocol_name_;
  132. const int protocol_version_;
  133. std::atomic_int call_id_;
  134. std::unique_ptr<RpcConnection> conn_;
  135. };
  136. }
  137. #endif