protobuf.h 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. #ifndef LIBHDFSPP_COMMON_COROUTINES_PROTOBUF_H_
  2. #define LIBHDFSPP_COMMON_COROUTINES_PROTOBUF_H_
  3. #include "common/util.h"
  4. #include <google/protobuf/message_lite.h>
  5. #include <google/protobuf/io/coded_stream.h>
  6. #include <google/protobuf/io/zero_copy_stream_impl_lite.h>
  7. #include <cassert>
  8. namespace hdfs {
  9. namespace continuation {
  10. template <class Stream, size_t MaxMessageSize = 512>
  11. struct ReadDelimitedPBMessageContinuation : public Continuation {
  12. ReadDelimitedPBMessageContinuation(Stream *stream, ::google::protobuf::MessageLite *msg)
  13. : stream_(stream)
  14. , msg_(msg)
  15. {}
  16. virtual void Run(const Next& next) override {
  17. namespace pbio = google::protobuf::io;
  18. auto handler = [this,next](const asio::error_code &ec, size_t) {
  19. Status status;
  20. if (ec) {
  21. status = ToStatus(ec);
  22. } else {
  23. pbio::ArrayInputStream as(&buf_[0], buf_.size());
  24. pbio::CodedInputStream is(&as);
  25. uint32_t size = 0;
  26. bool v = is.ReadVarint32(&size);
  27. assert(v);
  28. is.PushLimit(size);
  29. msg_->Clear();
  30. v = msg_->MergeFromCodedStream(&is);
  31. assert(v);
  32. }
  33. next(status);
  34. };
  35. asio::async_read(
  36. *stream_, asio::buffer(buf_),
  37. std::bind(&ReadDelimitedPBMessageContinuation::CompletionHandler, this,
  38. std::placeholders::_1, std::placeholders::_2),
  39. handler);
  40. }
  41. private:
  42. size_t CompletionHandler(const asio::error_code &ec, size_t transferred) {
  43. if (ec) {
  44. return 0;
  45. }
  46. size_t offset = 0, len = 0;
  47. for (size_t i = 0; i + 1 < transferred && i < sizeof(int); ++i) {
  48. len = (len << 7) | (buf_[i] & 0x7f);
  49. if ((uint8_t)buf_.at(i) < 0x80) {
  50. offset = i + 1;
  51. break;
  52. }
  53. }
  54. assert (offset + len < buf_.size() && "Message is too big");
  55. return offset ? len + offset - transferred : 1;
  56. }
  57. Stream *stream_;
  58. ::google::protobuf::MessageLite *msg_;
  59. std::array<char, MaxMessageSize> buf_;
  60. };
  61. template <class Stream>
  62. struct WriteDelimitedPBMessageContinuation : Continuation {
  63. WriteDelimitedPBMessageContinuation(
  64. Stream *stream, const google::protobuf::MessageLite *msg)
  65. : stream_(stream)
  66. , msg_(msg)
  67. {}
  68. virtual void Run(const Next& next) override {
  69. namespace pbio = google::protobuf::io;
  70. int size = msg_->ByteSize();
  71. buf_.reserve(pbio::CodedOutputStream::VarintSize32(size) + size);
  72. pbio::StringOutputStream ss(&buf_);
  73. pbio::CodedOutputStream os(&ss);
  74. os.WriteVarint32(size);
  75. msg_->SerializeToCodedStream(&os);
  76. write_coroutine_ = std::shared_ptr<Continuation>(Write(stream_, asio::buffer(buf_)));
  77. write_coroutine_->Run([next](const Status &stat) { next(stat); });
  78. }
  79. private:
  80. Stream *stream_;
  81. const google::protobuf::MessageLite * msg_;
  82. std::string buf_;
  83. std::shared_ptr<Continuation> write_coroutine_;
  84. };
  85. template<class Stream, size_t MaxMessageSize = 512>
  86. static inline Continuation*
  87. ReadDelimitedPBMessage(Stream *stream, ::google::protobuf::MessageLite *msg) {
  88. return new ReadDelimitedPBMessageContinuation<Stream, MaxMessageSize>(stream, msg);
  89. }
  90. template<class Stream>
  91. static inline Continuation*
  92. WriteDelimitedPBMessage(Stream *stream, ::google::protobuf::MessageLite *msg) {
  93. return new WriteDelimitedPBMessageContinuation<Stream>(stream, msg);
  94. }
  95. }
  96. }
  97. #endif