inputstream_impl.h 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #ifndef FS_INPUTSTREAM_IMPL_H_
  19. #define FS_INPUTSTREAM_IMPL_H_
  20. #include "reader/block_reader.h"
  21. #include "common/continuation/asio.h"
  22. #include <functional>
  23. #include <future>
  24. namespace hdfs {
  25. struct InputStreamImpl::HandshakeContinuation : continuation::Continuation {
  26. typedef RemoteBlockReader<::asio::ip::tcp::socket> Reader;
  27. HandshakeContinuation(Reader *reader, const std::string &client_name,
  28. const hadoop::common::TokenProto *token,
  29. const hadoop::hdfs::ExtendedBlockProto *block,
  30. uint64_t length, uint64_t offset)
  31. : reader_(reader)
  32. , client_name_(client_name)
  33. , length_(length)
  34. , offset_(offset)
  35. {
  36. if (token) {
  37. token_.reset(new hadoop::common::TokenProto());
  38. token_->CheckTypeAndMergeFrom(*token);
  39. }
  40. block_.CheckTypeAndMergeFrom(*block);
  41. }
  42. virtual void Run(const Next& next) override {
  43. reader_->async_connect(client_name_, token_.get(), &block_, length_, offset_, next);
  44. }
  45. private:
  46. Reader *reader_;
  47. const std::string client_name_;
  48. std::unique_ptr<hadoop::common::TokenProto> token_;
  49. hadoop::hdfs::ExtendedBlockProto block_;
  50. uint64_t length_;
  51. uint64_t offset_;
  52. };
  53. template<class MutableBufferSequence>
  54. struct InputStreamImpl::ReadBlockContinuation : continuation::Continuation {
  55. typedef RemoteBlockReader<::asio::ip::tcp::socket> Reader;
  56. ReadBlockContinuation(Reader *reader, MutableBufferSequence buffer,
  57. size_t *transferred)
  58. : reader_(reader)
  59. , buffer_(buffer)
  60. , buffer_size_(asio::buffer_size(buffer))
  61. , transferred_(transferred)
  62. {}
  63. virtual void Run(const Next& next) override {
  64. *transferred_ = 0;
  65. next_ = next;
  66. OnReadData(Status::OK(), 0);
  67. }
  68. private:
  69. Reader *reader_;
  70. MutableBufferSequence buffer_;
  71. const size_t buffer_size_;
  72. size_t *transferred_;
  73. std::function<void(const Status &)> next_;
  74. void OnReadData(const Status &status, size_t transferred) {
  75. using std::placeholders::_1;
  76. using std::placeholders::_2;
  77. *transferred_ += transferred;
  78. if (!status.ok()) {
  79. next_(status);
  80. } else if (*transferred_ >= buffer_size_) {
  81. next_(status);
  82. } else {
  83. reader_->async_read_some(
  84. asio::buffer(buffer_ + *transferred_, buffer_size_ - *transferred_),
  85. std::bind(&ReadBlockContinuation::OnReadData, this, _1, _2));
  86. }
  87. }
  88. };
  89. template<class MutableBufferSequence, class Handler>
  90. void InputStreamImpl::AsyncPreadSome(
  91. size_t offset, const MutableBufferSequence &buffers,
  92. const Handler &handler) {
  93. using ::hadoop::hdfs::LocatedBlockProto;
  94. namespace ip = ::asio::ip;
  95. using ::asio::ip::tcp;
  96. auto it = std::find_if(
  97. blocks_.begin(), blocks_.end(),
  98. [offset](const LocatedBlockProto &p) {
  99. return p.offset() <= offset && offset < p.offset() + p.b().numbytes();
  100. });
  101. if (it == blocks_.end()) {
  102. handler(Status::InvalidArgument("Cannot find corresponding blocks"), 0);
  103. return;
  104. } else if (!it->locs_size()) {
  105. handler(Status::ResourceUnavailable("No datanodes available"), 0);
  106. return;
  107. }
  108. uint64_t offset_within_block = offset - it->offset();
  109. uint64_t size_within_block =
  110. std::min<uint64_t>(it->b().numbytes() - offset_within_block, asio::buffer_size(buffers));
  111. struct State {
  112. std::unique_ptr<tcp::socket> conn;
  113. std::shared_ptr<RemoteBlockReader<tcp::socket> > reader;
  114. LocatedBlockProto block;
  115. std::vector<tcp::endpoint> endpoints;
  116. size_t transferred;
  117. };
  118. auto m = continuation::Pipeline<State>::Create();
  119. auto &s = m->state();
  120. s.conn.reset(new tcp::socket(fs_->rpc_engine().io_service()));
  121. s.reader = std::make_shared<RemoteBlockReader<tcp::socket>>(BlockReaderOptions(), s.conn.get());
  122. s.block = *it;
  123. for (auto &loc : it->locs()) {
  124. auto datanode = loc.id();
  125. s.endpoints.push_back(tcp::endpoint(ip::address::from_string(datanode.ipaddr()), datanode.xferport()));
  126. }
  127. m->Push(continuation::Connect(s.conn.get(), s.endpoints.begin(), s.endpoints.end()))
  128. .Push(new HandshakeContinuation(s.reader.get(), fs_->rpc_engine().client_name(), nullptr,
  129. &s.block.b(), size_within_block, offset_within_block))
  130. .Push(new ReadBlockContinuation<::asio::mutable_buffers_1>(
  131. s.reader.get(), asio::buffer(buffers, size_within_block), &s.transferred));
  132. m->Run([handler](const Status &status, const State &state) {
  133. handler(status, state.transferred);
  134. });
  135. }
  136. }
  137. #endif