| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159 | /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements.  See the NOTICE file * distributed with this work for additional information * regarding copyright ownership.  The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License.  You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */#ifndef FS_INPUTSTREAM_IMPL_H_#define FS_INPUTSTREAM_IMPL_H_#include "reader/block_reader.h"#include "common/continuation/asio.h"#include <functional>#include <future>namespace hdfs {struct InputStreamImpl::HandshakeContinuation : continuation::Continuation {  typedef RemoteBlockReader<::asio::ip::tcp::socket> Reader;  HandshakeContinuation(Reader *reader, const std::string &client_name,               const hadoop::common::TokenProto *token,               const hadoop::hdfs::ExtendedBlockProto *block,               uint64_t length, uint64_t offset)      : reader_(reader)      , client_name_(client_name)      , length_(length)      , offset_(offset)  {    if (token) {      token_.reset(new hadoop::common::TokenProto());      token_->CheckTypeAndMergeFrom(*token);    }    block_.CheckTypeAndMergeFrom(*block);  }  virtual void Run(const Next& next) override {    reader_->async_connect(client_name_, token_.get(), &block_, length_, offset_, next);  } private:  Reader *reader_;  const std::string client_name_;  std::unique_ptr<hadoop::common::TokenProto> token_;  hadoop::hdfs::ExtendedBlockProto block_;  uint64_t length_;  uint64_t offset_;};template<class MutableBufferSequence>struct InputStreamImpl::ReadBlockContinuation : continuation::Continuation {  typedef RemoteBlockReader<::asio::ip::tcp::socket> Reader;  ReadBlockContinuation(Reader *reader, MutableBufferSequence buffer,                 size_t *transferred)      : reader_(reader)      , buffer_(buffer)      , buffer_size_(asio::buffer_size(buffer))      , transferred_(transferred)  {}  virtual void Run(const Next& next) override {    *transferred_ = 0;    next_ = next;    OnReadData(Status::OK(), 0);  } private:  Reader *reader_;  MutableBufferSequence buffer_;  const size_t buffer_size_;  size_t *transferred_;  std::function<void(const Status &)> next_;  void OnReadData(const Status &status, size_t transferred) {    using std::placeholders::_1;    using std::placeholders::_2;    *transferred_ += transferred;    if (!status.ok()) {      next_(status);    } else if (*transferred_ >= buffer_size_) {      next_(status);    } else {      reader_->async_read_some(          asio::buffer(buffer_ + *transferred_, buffer_size_ - *transferred_),          std::bind(&ReadBlockContinuation::OnReadData, this, _1, _2));    }  }};template<class MutableBufferSequence, class Handler>void InputStreamImpl::AsyncPreadSome(    size_t offset, const MutableBufferSequence &buffers,    const Handler &handler) {  using ::hadoop::hdfs::LocatedBlockProto;  namespace ip = ::asio::ip;  using ::asio::ip::tcp;  auto it = std::find_if(      blocks_.begin(), blocks_.end(),      [offset](const LocatedBlockProto &p) {        return p.offset() <= offset && offset < p.offset() + p.b().numbytes();      });  if (it == blocks_.end()) {    handler(Status::InvalidArgument("Cannot find corresponding blocks"), 0);    return;  } else if (!it->locs_size()) {    handler(Status::ResourceUnavailable("No datanodes available"), 0);    return;  }  uint64_t offset_within_block = offset - it->offset();  uint64_t size_within_block =      std::min<uint64_t>(it->b().numbytes() - offset_within_block, asio::buffer_size(buffers));  struct State {    std::unique_ptr<tcp::socket> conn;    std::shared_ptr<RemoteBlockReader<tcp::socket> > reader;    LocatedBlockProto block;    std::vector<tcp::endpoint> endpoints;    size_t transferred;  };  auto m = continuation::Pipeline<State>::Create();  auto &s = m->state();  s.conn.reset(new tcp::socket(fs_->rpc_engine().io_service()));  s.reader = std::make_shared<RemoteBlockReader<tcp::socket>>(BlockReaderOptions(), s.conn.get());  s.block = *it;  for (auto &loc : it->locs()) {    auto datanode = loc.id();    s.endpoints.push_back(tcp::endpoint(ip::address::from_string(datanode.ipaddr()), datanode.xferport()));  }  m->Push(continuation::Connect(s.conn.get(), s.endpoints.begin(), s.endpoints.end()))      .Push(new HandshakeContinuation(s.reader.get(), fs_->rpc_engine().client_name(), nullptr,                                   &s.block.b(), size_within_block, offset_within_block))      .Push(new ReadBlockContinuation<::asio::mutable_buffers_1>(          s.reader.get(), asio::buffer(buffers, size_within_block), &s.transferred));  m->Run([handler](const Status &status, const State &state) {      handler(status, state.transferred);    });}}#endif
 |