123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137 |
- #include <iostream>
- //
- // Created by Frapo on 2018/2/9.
- //
- #include "include/libhdfs++/hdfs.h"
- #include <asio/ip/tcp.hpp>
- #include <fs/filesystem.h>
- using namespace ::hdfs;
- using ::asio::ip::tcp;
- void print_buf(void * buf, size_t read_count);
- static IoService * ioService = NULL;
- int initialize(){
- ioService = new IoServiceImpl();
- auto func = std::bind(& IoService::Run, ioService);
- std::thread first(func);
- first.detach();
- }
- InputStream * open_hdfs_file(const char *file_path,const char *server, short unsigned int port){
- if(ioService == NULL){
- initialize();
- }
- FileSystem * fsptr;
- Status stat = FileSystem::New(ioService, server, port, &fsptr);
- if (!stat.ok()) {
- printf("[Exception] Main::read_a_file Unable to establish connection to server %s:%d", server, port);
- printf(stat.ToString().c_str());
- throw std::bad_exception();
- }
- InputStream * isptr;
- // 读取一次
- stat = fsptr->Open(file_path, &isptr);
- if (!stat.ok()) {
- printf("[Exception] Main::read_a_file Unable to open file %s", file_path);
- printf(stat.ToString().c_str());
- throw std::bad_exception();
- }
- return isptr;
- }
- /// Open a file to InputStream *.
- /// \param file_path
- /// \param server
- /// \param port
- /// \return
- int read_a_file(InputStream * isptr, void * buf, size_t file_size, int offset_in_trunk){
- //size_t file_size = 100;
- //int offset_in_trunk = 10;
- /*
- *
- * rand.nextBytes(contents);
- * OutputStream os = cluster.getFileSystem().create(new Path("/foo"))
- * os.write(contents);
- *
- * */
- size_t read_count = 0;
- isptr->PositionRead(buf, file_size, 0, &read_count);
- print_buf(buf,read_count);
- printf("read : %lu", read_count);
- free(buf);
- buf = malloc(file_size - offset_in_trunk);
- read_count = 0;
- isptr->PositionRead(buf, file_size - offset_in_trunk, offset_in_trunk, &read_count);
- print_buf(buf,read_count);
- printf("read : %lu", read_count);
- free(buf);
- return 0;
- }
- void print_buf(void * buf, size_t read_count){
- for(unsigned int indi = 0; indi < read_count ; indi++){
- putchar( *((char *) (buf) +indi));
- }
- free(buf);
- }
- int main(int argc, char **argv) {
- static_cast<void>(argc);
- static_cast<void>(argv);
- std::cout << "Hello, World!" << std::endl;
- //int PORT = 9000;
- std::string host = "117.107.241.78";
- std::string fn = "/a.txt";
- // getting size of file.
- int file_size = 1000;
- void *buf = malloc(file_size);
- InputStream *isptr;
- try {
- isptr = open_hdfs_file("/a.txt", "127.0.0.1", 9000);
- }catch(std::bad_exception ef){
- printf("\n\ncannot open hdfs file, exception what() : %s ",ef.what());
- isptr = 0;
- }
- if(isptr == NULL){
- return -101;
- }
- read_a_file(isptr, buf, 100, 50);
- }
- /**
- *
- * hdfsFS fs = hdfsConnect("default", 0);
- const char* writePath = "/tmp/testfile.txt";
- hdfsFile writeFile = hdfsOpenFile(fs, writePath, O_WRONLY|O_CREAT, 0, 0, 0);
- if(!writeFile) {
- fprintf(stderr, "Failed to open %s for writing!\n", writePath);
- exit(-1);
- }
- char* buffer = "Hello, World!";
- tSize num_written_bytes = hdfsWrite(fs, writeFile, (void*)buffer, strlen(buffer)+1);
- if (hdfsFlush(fs, writeFile)) {
- fprintf(stderr, "Failed to 'flush' %s\n", writePath);
- exit(-1);
- }
- hdfsCloseFile(fs, writeFile);* /
- * */
|