1.需求
- 微博内容的浏览、数据库表设计
- 用户社交体现:关注用户,取关用户
- 拉取关注的人的微博内容
2.功能接口设计
- 创建命名空间以及表名的定义
- 创建微博内容表
- 创建用户关系表
- 创建用户微博内容接收邮件表
- 发布微博内容
- 添加关注用户
- 移出(取关)用户
- 获取关注的人的微博内容
- 测试
微博内容表
image.png
用户关系表
image.png
微博收件箱表
image.png
3.创建Message实体类
public class Message {
private String uid;
private String timestamp;
private String content;
// 省略getter、setter和toString
}
4. 创建表
public class WeiBoTest {
public static Configuration conf;
//微博内容表的表名
public static final byte[] TABLE_CONTENT = Bytes.toBytes("weibo:content");
// 用户关系表的表名
public static final byte[] TABLE_RELATIONS = Bytes.toBytes("weibo:relations");
//微博收件箱表名
public static final byte[] TABLE_RECEIVE_CONTENT_EMAIL = Bytes.toBytes("weibo:receive_content_email");
@Before
public void beforeInit() {
System.setProperty("hadoop.home.dir","D:\\program files\\hadoop-common-2.2.0-bin-master");
//使用HbaseConfiguration的单例方法实例化
conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum","47.104.129.190");
conf.set("hbase.zookeeper.property.clientPort","2181");
}
/**
* 创建命名空间以及表名的定义
*/
@Test
public void createTableNamespace() {
HBaseAdmin admin = null;
try{
admin = new HBaseAdmin(conf);
//命名空间类似于关系型数据库的schema
NamespaceDescriptor weibo = NamespaceDescriptor
.create("weibo")
.addConfiguration("creator","zhangsan")
.addConfiguration("create_time",System.currentTimeMillis() + "")
.build();
admin.createNamespace(weibo);
}catch (Exception e) {
e.printStackTrace();
}finally {
if(null != admin) {
try{
admin.close();
}catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* 创建微博内容表
*/
@Test
public void createTableContent() {
HBaseAdmin admin = null;
try{
admin = new HBaseAdmin(conf);
//创建表描述
HTableDescriptor content = new HTableDescriptor(TableName.valueOf(TABLE_CONTENT));
//创建列族描述
HColumnDescriptor info = new HColumnDescriptor(Bytes.toBytes("info"));
//设置块缓存
info.setBlockCacheEnabled(true);
//设置块缓存大小
info.setBlocksize(2097152);
//设置压缩方式
// info.setCompactionCompressionType(Compression.Algorithm.SNAPPY);
//设置版本边界
info.setMaxVersions(1);
info.setMinVersions(1);
content.addFamily(info);
admin.createTable(content);
}catch (Exception e) {
e.printStackTrace();
}finally {
if(null != admin) {
try{
admin.close();
}catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* 创建用户关系表
*/
@Test
public void createTableRelations() {
HBaseAdmin admin = null;
try{
admin = new HBaseAdmin(conf);
//创建表描述
HTableDescriptor relations = new HTableDescriptor(TableName.valueOf(TABLE_RELATIONS));
//关注的人的列族
HColumnDescriptor attends = new HColumnDescriptor(Bytes.toBytes("attends"));
//设置块缓存
attends.setBlockCacheEnabled(true);
//设置块缓存大小
attends.setBlocksize(2097152);
//设置压缩方式
// info.setCompactionCompressionType(Compression.Algorithm.SNAPPY);
//设置版本边界
attends.setMaxVersions(1);
attends.setMinVersions(1);
//粉丝列族
HColumnDescriptor fans = new HColumnDescriptor(Bytes.toBytes("fans"));
fans.setBlockCacheEnabled(true);
fans.setBlocksize(2097152);
fans.setMaxVersions(1);
fans.setMinVersions(1);
relations.addFamily(attends);
relations.addFamily(fans);
admin.createTable(relations);
}catch (Exception e) {
}finally {
try{
admin.close();
}catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 创建微博收件箱表
*/
@Test
public void createTableReceiveContentEamil() {
HBaseAdmin admin = null;
try{
admin = new HBaseAdmin(conf);
HTableDescriptor receive_content_email = new HTableDescriptor(TableName.valueOf(TABLE_RECEIVE_CONTENT_EMAIL));
HColumnDescriptor info = new HColumnDescriptor(Bytes.toBytes("info"));
//设置块缓存
info.setBlockCacheEnabled(true);
//设置块缓存大小
info.setBlocksize(2097152);
//设置压缩方式
// info.setCompactionCompressionType(Compression.Algorithm.SNAPPY);
//设置版本边界
info.setMaxVersions(1000);
info.setMinVersions(1000);
//粉丝列族
HColumnDescriptor fans = new HColumnDescriptor(Bytes.toBytes("fans"));
fans.setBlockCacheEnabled(true);
fans.setBlocksize(2097152);
fans.setMaxVersions(1);
fans.setMinVersions(1);
receive_content_email.addFamily(info);
receive_content_email.addFamily(fans);
admin.createTable(receive_content_email);
}catch (Exception e) {
}finally {
try{
admin.close();
}catch (IOException e) {
e.printStackTrace();
}
}
}
}
5. 创建接口类
public class WeiBoPublish {
public static Configuration conf;
static {
System.setProperty("hadoop.home.dir","D:\\program files\\hadoop-common-2.2.0-bin-master");
//使用HbaseConfiguration的单例方法实例化
conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum","47.104.129.190");
conf.set("hbase.zookeeper.property.clientPort","2181");
}
/**
* 发布微博
* a. 微博内容表中数据+1
* b. 向微博收件箱表中加入微博的Rowkey
* @param uid
* @param content
*/
public void publishContent(String uid,String content) {
HConnection connection = null;
try{
connection = HConnectionManager.createConnection(conf);
// a.微博内容表中添加1条数据,首先获取微博内容表描述
HTableInterface contentTBL = connection.getTable(TableName.valueOf(WeiBoTest.TABLE_CONTENT));
//组装Rowkey
long timestamp = System.currentTimeMillis();
String rowKey = uid + "_" + timestamp;
Put put = new Put(Bytes.toBytes(rowKey));
put.add(Bytes.toBytes("info"),Bytes.toBytes("content"),timestamp,Bytes.toBytes(content));
contentTBL.put(put);
// b.向微博收件箱表中加入发布的Rowkey
// b.1.查询用户关系表,得到当前用户有哪些粉丝
HTableInterface relationsTBL = connection.getTable(TableName.valueOf(WeiBoTest.TABLE_RELATIONS));
// b.2. 取出目标数据
Get get = new Get(Bytes.toBytes(uid));
get.addFamily(Bytes.toBytes("fans"));
Result result = relationsTBL.get(get);
List<byte[]> fans = new ArrayList<>();
//遍历取出当前发布微博的用户的所有粉丝数据
for(Cell cell : result.rawCells()) {
fans.add(CellUtil.cloneQualifier(cell));
}
//如果该用户没有粉丝,直接return
if(fans.size() <= 0) return;
//开始操作收件箱表
HTableInterface recTBL = connection.getTable(TableName.valueOf(WeiBoTest.TABLE_RECEIVE_CONTENT_EMAIL));
List<Put> puts = new ArrayList<>();
for(byte[] fan : fans) {
Put fanPut = new Put(fan);
fanPut.add(Bytes.toBytes("info"),Bytes.toBytes(uid),timestamp,Bytes.toBytes(rowKey));
puts.add(fanPut);
}
recTBL.put(puts);
}catch (Exception e) {
e.printStackTrace();
}finally {
if(null != connection) {
try{
connection.close();
}catch (Exception e) {
e.printStackTrace();
}
}
}
}
/**
* 关注用户逻辑
* a. 在微博用户关系表中,对当前主动操作的用户添加新的关注的好友
* b. 在微博用户关系中,对被关注的用户添加粉丝(当前操作的用户)
* c.当前操作用户的微博收件箱添加所关注的用户发布的微博rowkey
* @param uid
* @param attends
*/
public void addAttends(String uid,String... attends) {
//参数过滤
if(attends == null || attends.length <= 0 || uid == null || uid.length() <= 0) return;
HConnection connection = null;
try{
connection = HConnectionManager.createConnection(conf);
//用户关系表操作对象(连接到用户关系表)
HTableInterface relationsTBL = connection.getTable(TableName.valueOf(WeiBoTest.TABLE_RELATIONS));
List<Put> puts = new ArrayList<>();
// a.在微博用户关系中,添加新关注的好友
Put attendPut = new Put(Bytes.toBytes(uid));
for(String attend : attends) {
//为当前用户添加关注的人
attendPut.add(Bytes.toBytes("attends"),Bytes.toBytes(attend),Bytes.toBytes(attend));
//b.为被关注的人,添加粉丝
Put fansPut = new Put(Bytes.toBytes(attend));
fansPut.add(Bytes.toBytes("fans"),Bytes.toBytes(uid),Bytes.toBytes(uid));
//将所有关注的人一个一个的添加到puts(List)集合中
puts.add(fansPut);
}
puts.add(attendPut);
relationsTBL.put(puts);
//c.1 微博收件箱添加关注的用户发布的微博内容(content)的rowkey
HTableInterface contentTBL = connection.getTable(TableName.valueOf(WeiBoTest.TABLE_CONTENT));
Scan scan = new Scan();
//用于存放取出来的关注的人所发布的微博的rowkey
List<byte[]> rowkeys = new ArrayList<>();
for(String attend : attends) {
// 过滤扫描rowkey,即:前置未匹配被关注的人的uid_
RowFilter filter = new RowFilter(CompareFilter.CompareOp.EQUAL,new SubstringComparator(attend + " "));
//为扫描对象指定过滤规则
scan.setFilter(filter);
//通过扫描对象得到scanner
ResultScanner result = contentTBL.getScanner(scan);
//迭代遍历扫描出来的结果集
Iterator<Result> iterator = result.iterator();
while (iterator.hasNext()) {
//取出每一个符合扫描结果的那一行数据
Result r = iterator.next();
for(Cell cell :r.rawCells()) {
//将得到的rowkey放置于集合容器中
rowkeys.add(CellUtil.cloneRow(cell));
}
}
// c.2 将取出的微博的rowkey放置于当前操作用户的收件箱中
if(rowkeys.size() <= 0) return;
//得到微博收件箱的操作对象
HTableInterface recTBL = connection.getTable(TableName.valueOf(WeiBoTest.TABLE_RECEIVE_CONTENT_EMAIL));
//用于存放多个关注的用户的发布的多条微博rowkey信息
List<Put> recPuts = new ArrayList<>();
for(byte[] rk : rowkeys) {
Put put = new Put(Bytes.toBytes(uid));
//uid_timestamp
String rowkey = Bytes.toString(rk);
//截取uid
String attendUID = rowkey.substring(0,rowkey.indexOf("_"));
long timestamp = Long.parseLong(rowkey.substring(rowkey.indexOf(" ") +1));
//将微博rowkey添加到指定单元格中
put.add(Bytes.toBytes("info"),Bytes.toBytes(attendUID),timestamp,rk);
recPuts.add(put);
}
recTBL.put(recPuts);
}
}catch (IOException e) {
e.printStackTrace();
}finally {
if(null != connection) {
try{
connection.close();
}catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* 移出(取关)用户
* a.在微博用户关系表中,对当前主动操作的用户删除对应取关的好友
* b.在微博用户关系表中,对被取消关注的人删除粉丝(当前操作人)
* c.从收件箱中,删除取关的人的微博的rowkey
*/
@Test
public void removeAttends(String uid,String... attends) {
//过滤数据
if(uid == null || uid.length() <= 0 || attends == null || attends.length <= 0) return;
HConnection connection = null;
try{
connection = HConnectionManager.createConnection(conf);
//a.在微博用户关系表中,删除已关注的好友
HTableInterface relationsTBL = connection.getTable(TableName.valueOf(WeiBoTest.TABLE_RELATIONS));
//待删除的用户关系表中的所有数据
List<Delete> deletes = new ArrayList<>();
//当前取关操作者的uid的Delete对象
Delete attendDelete = new Delete(Bytes.toBytes(uid));
//遍历取关,同时每次取关都要将被取关的人的粉丝-1
for(String attend : attends) {
attendDelete.deleteColumn(Bytes.toBytes("attend"),Bytes.toBytes(attend));
Delete fansDelete = new Delete(Bytes.toBytes(attend));
fansDelete.deleteColumn(Bytes.toBytes("fans"),Bytes.toBytes(uid));
deletes.add(fansDelete);
}
deletes.add(attendDelete);
relationsTBL.delete(deletes);
// c. 删除取关的人的微博rowkey从收件箱表中
HTableInterface recTBL = connection.getTable(TableName.valueOf(WeiBoTest.TABLE_RECEIVE_CONTENT_EMAIL));
Delete recDelete = new Delete(Bytes.toBytes(uid));
for(String attend : attends) {
recDelete.deleteColumn(Bytes.toBytes("info"),Bytes.toBytes(attend));
}
recTBL.delete(recDelete);
}catch (IOException e) {
e.printStackTrace();
}
}
/**
* 获取关注的人的微博内容
* a.从微博收件箱中获取所有关注的人的发布的微博的rowkey
* b.根据得到的rowkey去微博内容表中得到数据
* c.将得到的数据封装到Message对象中
*/
public List<Message> getAttendsContent(String uid) {
HConnection connection = null;
try{
connection = HConnectionManager.createConnection(conf);
HTableInterface recTBL = connection.getTable(TableName.valueOf(WeiBoTest.TABLE_RECEIVE_CONTENT_EMAIL));
// a.从收件箱中取得微博rowKey
Get get = new Get(Bytes.toBytes(uid));
//设置最大版本号
get.setMaxVersions(5);
List<byte[]> rowkeys = new ArrayList<>();
Result result = recTBL.get(get);
for(Cell cell : result.rawCells()) {
rowkeys.add(CellUtil.cloneValue(cell));
}
//b. 根据取出的所有的rowkey去微博内容表中检索数据
HTableInterface contentTBL = connection.getTable(TableName.valueOf(WeiBoTest.TABLE_CONTENT));
List<Get> gets = new ArrayList<>();
//根据rowkey取出对应微博的具体内容
for(byte[] rk : rowkeys) {
Get g = new Get(rk);
gets.add(g);
}
//得到所有的微博内容的result
Result[] results = contentTBL.get(gets);
List<Message> messages = new ArrayList<>();
for(Result res : results) {
for(Cell cell : res.rawCells()) {
Message message = new Message();
String rowKey = Bytes.toString(CellUtil.cloneRow(cell));
String userid = rowKey.substring(0,rowKey.indexOf("_"));
String timestamp = rowKey.substring(rowKey.indexOf("_") + 1);
String content = Bytes.toString(CellUtil.cloneValue(cell));
message.setContent(content);
message.setTimestamp(timestamp);
message.setUid(userid);
messages.add(message);
}
}
return messages;
}catch (IOException e) {
e.printStackTrace();
}finally {
try{
connection.close();
}catch (IOException e) {
e.printStackTrace();
}
}
return null;
}
}
6.编写测试类
public class WeiBoPublishTest {
/**
* 测试发布微博内容
*/
@Test
public void testPublishContent() {
WeiBoPublish wb = new WeiBoPublish();
wb.publishContent("0001","今天买了一包空气,送了点薯片,非常开心!!");
wb.publishContent("0001","今天天气不错");
}
/**
* 测试添加关注
*/
@Test
public void testAddAttend() {
WeiBoPublish wb = new WeiBoPublish();
wb.publishContent("0008","准备下课");
wb.publishContent("0009","准备关机");
wb.addAttends("0001","0008","0009");
}
/**
* 测试取消关注
*/
@Test
public void testRemoveAttend() {
WeiBoPublish wb = new WeiBoPublish();
wb.removeAttends("0001","0008");
}
/**
* 测试展示内容
*/
@Test
public void testShowMessage() {
WeiBoPublish wb = new WeiBoPublish();
List<Message> messages = wb.getAttendsContent("0001");
for(Message message :messages) {
System.out.println(message);
}
}
}