package com.peony.netty.util; import com.peony.util.StringUtils; import com.peony.util.SystemProps; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.List; public class HbaseFileSystem { private static final Logger logger = LoggerFactory.getLogger(HbaseFileSystem.class); private static final int BATCH = SystemProps.getIntProperty("batch", 100); private static final int CACAHING = SystemProps.getIntProperty("caching", 1000); private String clientPort = "2181"; private String quorum = "192.168.3.37,192.168.3.38,192.168.3.39"; // private String master = "192.168.10.15"; private String master = "60000"; // private String parent = "/hbase/hbs-aqtb2h1b"; private String parent = "/hbase/hbs-gxjpkf8z"; private byte[] familyName = Bytes.toBytes("content"); private Connection connection; private HbaseFileSystem() { Configuration config = HBaseConfiguration.create(); config.set("hbase.zookeeper.property.clientPort", SystemProps.getProperty("clientPort", clientPort)); config.set("hbase.zookeeper.quorum", SystemProps.getProperty("quorum", quorum)); config.set("hbase.master", SystemProps.getProperty("master", master)); config.set("zookeeper.znode.parent", SystemProps.getProperty("parent", parent)); try { connection = ConnectionFactory.createConnection(config); } catch (IOException e) { logger.error("创建到hbase的连接失败", e); } } public byte[] get(String id, TableName tableName) throws Exception { Table table = connection.getTable(tableName); Get get = new Get(Bytes.toBytes(id)); Result rs = table.get(get); if (rs.isEmpty()) { return null; } byte[] data = rs.getValue(familyName, null); if (data == null) { return null; } return data; } public String get(String id, TableName tableName, boolean depress) throws Exception { byte[] bytes = get(id, tableName); if (bytes == null) { return null; } if (depress) { return new String(StringUtils.depress(bytes), StringUtils.CHARSET); } else { return StringUtils.encodeBase64String(bytes); } } public ResultScanner scan(TableName tableName) throws Exception { Table table = connection.getTable(tableName); Scan scan = new Scan(); scan.addFamily(familyName); scan.setMaxVersions(1); scan.setCaching(CACAHING); scan.setBatch(BATCH); return table.getScanner(scan); } public boolean add(String id, String content, boolean replace, TableName tableName) throws Exception { Table table = connection.getTable(tableName); Get get = new Get(Bytes.toBytes(id)); get.addColumn(familyName, null); if (!replace && table.exists(get)) { logger.debug("file exists in hbase, id: {}", id); return false; } Put put = new Put(Bytes.toBytes(id)); byte[] data = org.apache.commons.codec.binary.StringUtils.getBytesUnchecked(content, StringUtils.CHARSET); put.addColumn(familyName, null, data); table.put(put); return true; } public boolean delete(String id, TableName tableName) throws IOException { List ids = new ArrayList<>(); ids.add(id); return delete(ids, tableName); } public boolean delete(List ids, TableName tableName) throws IOException { Table table = connection.getTable(tableName); List deletes = new ArrayList<>(); for (String id : ids) { Delete delete = new Delete(Bytes.toBytes(id)); delete.addFamily(familyName); deletes.add(delete); } table.delete(deletes); return true; } private static final HbaseFileSystem instance = new HbaseFileSystem(); public static HbaseFileSystem getInstance() { return instance; } }