block_reader.cc 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. #include "libhdfs++/hdfs.h"
  2. #include "reader/block_reader.h"
  3. #include <asio/ip/tcp.hpp>
  4. using namespace ::hdfs;
  5. using ::asio::ip::tcp;
  6. std::shared_ptr<RemoteBlockReader<tcp::socket>> RemoteBlockReader_create(tcp::socket *conn) {
  7. auto self = new std::shared_ptr<RemoteBlockReader<tcp::socket> >
  8. (new RemoteBlockReader<tcp::socket>(BlockReaderOptions(), conn));
  9. return *self;
  10. }
  11. void RemoteBlockReader_destroy(std::shared_ptr<RemoteBlockReader<tcp::socket> >* handle) {
  12. delete handle;
  13. }
  14. std::string RemoteBlockReader_connect(
  15. const std::shared_ptr<RemoteBlockReader<tcp::socket>> &self,
  16. const std::string &client_name,
  17. const std::string &tokenProto,
  18. const std::string &blockProto,
  19. long length, long offset) {
  20. hadoop::common::TokenProto token;
  21. if (!tokenProto.empty()) {
  22. token.ParseFromString(tokenProto);
  23. }
  24. hadoop::hdfs::ExtendedBlockProto blockpro;
  25. blockpro.ParseFromArray(blockProto.data(), static_cast<int>(blockProto.size()));
  26. Status stat = self->connect(client_name, & token, &blockpro, length, offset);
  27. return stat.ToString();
  28. }
  29. size_t RemoteBlockReader_readSome(std::shared_ptr<RemoteBlockReader<tcp::socket>> self,
  30. void *buffer, int position, int limit, std::vector<Status> statuses) {
  31. Status stat;
  32. if (!buffer || position > limit) {
  33. stat = Status::InvalidArgument("Invalid buffer");
  34. statuses.push_back(stat);
  35. return 0;
  36. }
  37. size_t transferred = self->read_some(
  38. asio::buffer(buffer + position, limit - position),
  39. &stat
  40. );
  41. if (!stat.ok()) {
  42. statuses.push_back(stat);
  43. }
  44. return transferred;
  45. }