12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758 |
- #include "libhdfs++/hdfs.h"
- #include "reader/block_reader.h"
- #include <asio/ip/tcp.hpp>
- using namespace ::hdfs;
- using ::asio::ip::tcp;
- std::shared_ptr<RemoteBlockReader<tcp::socket>> RemoteBlockReader_create(tcp::socket *conn) {
- auto self = new std::shared_ptr<RemoteBlockReader<tcp::socket> >
- (new RemoteBlockReader<tcp::socket>(BlockReaderOptions(), conn));
- return *self;
- }
- void RemoteBlockReader_destroy(std::shared_ptr<RemoteBlockReader<tcp::socket> >* handle) {
- delete handle;
- }
- std::string RemoteBlockReader_connect(
- const std::shared_ptr<RemoteBlockReader<tcp::socket>> &self,
- const std::string &client_name,
- const std::string &tokenProto,
- const std::string &blockProto,
- long length, long offset) {
- hadoop::common::TokenProto token;
- if (!tokenProto.empty()) {
- token.ParseFromString(tokenProto);
- }
- hadoop::hdfs::ExtendedBlockProto blockpro;
- blockpro.ParseFromArray(blockProto.data(), static_cast<int>(blockProto.size()));
- Status stat = self->connect(client_name, & token, &blockpro, length, offset);
- return stat.ToString();
- }
- size_t RemoteBlockReader_readSome(std::shared_ptr<RemoteBlockReader<tcp::socket>> self,
- void *buffer, int position, int limit, std::vector<Status> statuses) {
- Status stat;
- if (!buffer || position > limit) {
- stat = Status::InvalidArgument("Invalid buffer");
- statuses.push_back(stat);
- return 0;
- }
- size_t transferred = self->read_some(
- asio::buffer(buffer + position, limit - position),
- &stat
- );
- if (!stat.ok()) {
- statuses.push_back(stat);
- }
- return transferred;
- }
|