hdfscore.cpp 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. #include <iostream>
  2. //
  3. // Created by Frapo on 2018/2/9.
  4. //
  5. #include "include/libhdfs++/hdfs.h"
  6. #include <asio/ip/tcp.hpp>
  7. #include <fs/filesystem.h>
  8. using namespace ::hdfs;
  9. using ::asio::ip::tcp;
  10. void print_buf(void * buf, size_t read_count);
  11. static IoService * ioService = NULL;
  12. int initialize(){
  13. ioService = new IoServiceImpl();
  14. auto func = std::bind(& IoService::Run, ioService);
  15. std::thread first(func);
  16. first.detach();
  17. }
  18. InputStream * open_hdfs_file(const char *file_path,const char *server, short unsigned int port){
  19. if(ioService == NULL){
  20. initialize();
  21. }
  22. FileSystem * fsptr;
  23. Status stat = FileSystem::New(ioService, server, port, &fsptr);
  24. if (!stat.ok()) {
  25. printf("[Exception] Main::read_a_file Unable to establish connection to server %s:%d", server, port);
  26. printf(stat.ToString().c_str());
  27. throw std::bad_exception();
  28. }
  29. InputStream * isptr;
  30. // 读取一次
  31. stat = fsptr->Open(file_path, &isptr);
  32. if (!stat.ok()) {
  33. printf("[Exception] Main::read_a_file Unable to open file %s", file_path);
  34. printf(stat.ToString().c_str());
  35. throw std::bad_exception();
  36. }
  37. return isptr;
  38. }
  39. /// Open a file to InputStream *.
  40. /// \param file_path
  41. /// \param server
  42. /// \param port
  43. /// \return
  44. int read_a_file(InputStream * isptr, void * buf, size_t file_size, int offset_in_trunk){
  45. //size_t file_size = 100;
  46. //int offset_in_trunk = 10;
  47. /*
  48. *
  49. * rand.nextBytes(contents);
  50. * OutputStream os = cluster.getFileSystem().create(new Path("/foo"))
  51. * os.write(contents);
  52. *
  53. * */
  54. size_t read_count = 0;
  55. isptr->PositionRead(buf, file_size, 0, &read_count);
  56. print_buf(buf,read_count);
  57. printf("read : %lu", read_count);
  58. free(buf);
  59. buf = malloc(file_size - offset_in_trunk);
  60. read_count = 0;
  61. isptr->PositionRead(buf, file_size - offset_in_trunk, offset_in_trunk, &read_count);
  62. print_buf(buf,read_count);
  63. printf("read : %lu", read_count);
  64. free(buf);
  65. return 0;
  66. }
  67. void print_buf(void * buf, size_t read_count){
  68. for(unsigned int indi = 0; indi < read_count ; indi++){
  69. putchar( *((char *) (buf) +indi));
  70. }
  71. free(buf);
  72. }
  73. int main(int argc, char **argv) {
  74. static_cast<void>(argc);
  75. static_cast<void>(argv);
  76. std::cout << "Hello, World!" << std::endl;
  77. //int PORT = 9000;
  78. std::string host = "117.107.241.78";
  79. std::string fn = "/a.txt";
  80. // getting size of file.
  81. int file_size = 1000;
  82. void *buf = malloc(file_size);
  83. InputStream *isptr;
  84. try {
  85. isptr = open_hdfs_file("/a.txt", "127.0.0.1", 9000);
  86. }catch(std::bad_exception ef){
  87. printf("\n\ncannot open hdfs file, exception what() : %s ",ef.what());
  88. isptr = 0;
  89. }
  90. if(isptr == NULL){
  91. return -101;
  92. }
  93. read_a_file(isptr, buf, 100, 50);
  94. }
  95. /**
  96. *
  97. * hdfsFS fs = hdfsConnect("default", 0);
  98. const char* writePath = "/tmp/testfile.txt";
  99. hdfsFile writeFile = hdfsOpenFile(fs, writePath, O_WRONLY|O_CREAT, 0, 0, 0);
  100. if(!writeFile) {
  101. fprintf(stderr, "Failed to open %s for writing!\n", writePath);
  102. exit(-1);
  103. }
  104. char* buffer = "Hello, World!";
  105. tSize num_written_bytes = hdfsWrite(fs, writeFile, (void*)buffer, strlen(buffer)+1);
  106. if (hdfsFlush(fs, writeFile)) {
  107. fprintf(stderr, "Failed to 'flush' %s\n", writePath);
  108. exit(-1);
  109. }
  110. hdfsCloseFile(fs, writeFile);* /
  111. * */