remote_block_reader_test.cc 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
  1. #include "block_reader.h"
  2. #include <asio.hpp>
  3. #include <iostream>
  4. #include <string>
  5. int main(int argc, char *argv[]) {
  6. using namespace hdfs;
  7. using ::asio::ip::tcp;
  8. if (argc != 8)
  9. {
  10. std::cerr
  11. << "A simple client to read a block in the HDFS cluster.\n"
  12. << "Usage: " << argv[0] << " "
  13. << "<poolid> <blockid> <genstamp> <size> <offset> <dnhost> <dnport>\n";
  14. return 1;
  15. }
  16. asio::io_service io_service;
  17. hadoop::hdfs::ExtendedBlockProto block;
  18. block.set_poolid(argv[1]);
  19. block.set_blockid(std::stol(argv[2]));
  20. block.set_generationstamp(std::stol(argv[3]));
  21. size_t size = std::stol(argv[4]);
  22. size_t offset = std::stol(argv[5]);
  23. tcp::resolver resolver(io_service);
  24. tcp::resolver::query query(tcp::v4(), argv[6], argv[7]);
  25. tcp::resolver::iterator iterator = resolver.resolve(query);
  26. std::shared_ptr<tcp::socket> s(new tcp::socket(io_service));
  27. asio::connect(*s.get(), iterator);
  28. BlockReaderOptions options;
  29. auto reader = std::make_shared<RemoteBlockReader<tcp::socket> >(options, s.get());
  30. std::unique_ptr<char[]> buf(new char[size]);
  31. reader->async_connect("libhdfs++", nullptr, &block, size, offset, [&buf,reader,size](const Status &status) {
  32. if (!status.ok()) {
  33. std::cerr << "Error:" << status.code() << " " << status.ToString() << std::endl;
  34. } else {
  35. reader->async_read_some(asio::buffer(buf.get(), size), [&buf,size](const Status &status, size_t transferred) {
  36. buf[std::min(transferred, size - 1)] = 0;
  37. std::cerr << "Done:" << status.code()
  38. << " transferred = " << transferred << "\n"
  39. << buf.get() << std::endl;
  40. });
  41. }
  42. });
  43. io_service.run();
  44. return 0;
  45. }