以mysql为例
创建表项
create table orders (
id INT NOT NULL AUTO_INCREMENT,
customer_id INT,
order_ref VARCHAR(127),
date timestamp,
PRIMARY KEY (id)
);
插入测试记录
insert into orders(customer_id,order_ref,date) values(1,'ref#001','2018-08-27 01:02:21')
insert into orders(customer_id,order_ref,date) values(1,'ref#002','2018-08-27 01:03:50')
insert into orders(customer_id,order_ref,date) values(1,'ref#003','2018-08-27 01:05:00')
insert into orders(customer_id,order_ref,date) values(1,'ref#004','2018-08-27 02:06:20')
insert into orders(customer_id,order_ref,date) values(2,'ref#005','2018-08-27 02:10:00')
insert into orders(customer_id,order_ref,date) values(2,'ref#006','2018-08-26 14:10:00')
insert into orders(customer_id,order_ref,date) values(2,'ref#007','2018-08-26 02:10:00')
insert into orders(customer_id,order_ref,date) values(2,'ref#008','2018-08-27 13:11:00')
insert into orders(customer_id,order_ref,date) values(3,'ref#009','2018-08-25 02:10:00')
insert into orders(customer_id,order_ref,date) values(3,'ref#010','2018-08-26 15:07:00')
insert into orders(customer_id,order_ref,date) values(3,'ref#011','2018-08-26 16:00:00')
insert into orders(customer_id,order_ref,date) values(3,'ref#012','2018-08-25 08:11:00')
查询数据库中记录,如下
id | cutomer_id | order_ref | date |
---|---|---|---|
1 | 1 | ref#001 | 2018-08-27 01:02:21 |
2 | 1 | ref#002 | 2018-08-27 01:03:50 |
3 | 1 | ref#003 | 2018-08-27 01:05:00 |
4 | 1 | ref#004 | 2018-08-27 02:06:20 |
5 | 2 | ref#005 | 2018-08-27 02:10:00 |
6 | 2 | ref#006 | 2018-08-26 14:10:00 |
7 | 2 | ref#007 | 2018-08-26 02:10:00 |
8 | 2 | ref#008 | 2018-08-27 13:11:00 |
9 | 3 | ref#009 | 2018-08-25 02:10:00 |
10 | 3 | ref#010 | 2018-08-26 15:07:00 |
11 | 3 | ref#011 | 2018-08-26 16:00:00 |
12 | 3 | ref#012 | 2018-08-25 08:11:00 |
需求1:
按照custom_id进行分组,且每组按照date排序,然后获取每组中时间最新的一条记录
结果数据如下
id | cutomer_id | order_ref | date |
---|---|---|---|
4 | 1 | ref#004 | 2018-08-27 02:06:20 |
8 | 2 | ref#008 | 2018-08-27 13:11:00 |
11 | 3 | ref#011 | 2018-08-26 16:00:00 |
子查询方式:
SELECT
*
FROM
orders a
WHERE
NOT EXISTS ( SELECT 1 FROM orders WHERE a.customer_id = orders.customer_id AND a.date < orders.date );
如果用来分组的字段值不是很多,也可以用如下union all的方式
SELECT
*
FROM
orders
WHERE
customer_id=[the_customer_id]
ORDER BY
date DESC
LIMIT 1
UNION ALL
SELECT
*
FROM
orders
WHERE
customer_id=[the_customer_id]
ORDER BY
date DESC
LIMIT 1
......
需求2:
按照custom_id进行分组,且每组按照date排序,然后获取每组中时间最新的n条记录
结果数据如下
id | cutomer_id | order_ref | date |
---|---|---|---|
4 | 1 | ref#004 | 2018-08-27 02:06:20 |
3 | 1 | ref#003 | 2018-08-27 01:05:00 |
8 | 2 | ref#008 | 2018-08-27 13:11:00 |
5 | 2 | ref#005 | 2018-08-27 02:10:00 |
11 | 3 | ref#011 | 2018-08-26 16:00:00 |
10 | 3 | ref#010 | 2018-08-26 15:07:00 |
子查询方式:
SELECT *
FROM orders a WHERE 2 > ( SELECT count( * ) FROM orders WHERE customer_id = a.customer_id AND date > a.date )
ORDER BY
a.customer_id,
a.date DESC
用户变量方式:
这种方式类似于模拟了开窗函数(mysql8.0开始支持)
SELECT
*
FROM
(SELECT
customer_id,
order_ref,
date,
@rn:=CASE
WHEN @var_customer_id = customer_id THEN @rn + 1
ELSE 1
END AS rn,
@var_customer_id:=customer_id
FROM
(SELECT @var_customer_id:=NULL, @rn:=NULL) vars, orders
ORDER BY customer_id , date DESC) as sub_table
WHERE
rn <= 2
ORDER BY customer_id , date DESC
目前的spark sql是支持开窗函数特性的,下面以spark sql操作hbase表为例,描述下在spark sql中进行分组查询top n记录的过程
在hbase shell中创建表并插入记录
create 'ORDERS', {NAME=>'cf1'}
put 'ORDERS', 10001,'cf1:customer_id', 1
put 'ORDERS', 10001,'cf1:order_ref','ref#001'
put 'ORDERS', 10001,'cf1:date','2018-08-27 01:03:50'
put 'ORDERS', 10002,'cf1:customer_id', 1
put 'ORDERS', 10002,'cf1:order_ref','ref#002'
put 'ORDERS', 10002,'cf1:date','2018-08-27 01:03:50'
put 'ORDERS', 10003,'cf1:customer_id', 1
put 'ORDERS', 10003,'cf1:order_ref','ref#003'
put 'ORDERS', 10003,'cf1:date','2018-08-27 01:05:00'
put 'ORDERS', 10004,'cf1:customer_id', 1
put 'ORDERS', 10004,'cf1:order_ref','ref#004'
put 'ORDERS', 10004,'cf1:date','2018-08-27 02:06:20'
put 'ORDERS', 10005,'cf1:customer_id', 2
put 'ORDERS', 10005,'cf1:order_ref','ref#005'
put 'ORDERS', 10005,'cf1:date','2018-08-27 02:10:00'
put 'ORDERS', 10006,'cf1:customer_id', 2
put 'ORDERS', 10006,'cf1:order_ref','ref#006'
put 'ORDERS', 10006,'cf1:date','2018-08-26 14:10:00'
put 'ORDERS', 10007,'cf1:customer_id', 2
put 'ORDERS', 10007,'cf1:order_ref','ref#007'
put 'ORDERS', 10007,'cf1:date','2018-08-26 02:10:00'
put 'ORDERS', 10008,'cf1:customer_id', 2
put 'ORDERS', 10008,'cf1:order_ref','ref#008'
put 'ORDERS', 10008,'cf1:date','2018-08-27 13:11:00'
put 'ORDERS', 10009,'cf1:customer_id', 3
put 'ORDERS', 10009,'cf1:order_ref','ref#009'
put 'ORDERS', 10009,'cf1:date','2018-08-25 02:10:00'
put 'ORDERS', 10010,'cf1:customer_id', 3
put 'ORDERS', 10010,'cf1:order_ref','ref#010'
put 'ORDERS', 10010,'cf1:date','2018-08-26 15:07:00'
put 'ORDERS', 10011,'cf1:customer_id', 3
put 'ORDERS', 10011,'cf1:order_ref','ref#011'
put 'ORDERS', 10011,'cf1:date','2018-08-26 16:00:00'
put 'ORDERS', 10012,'cf1:customer_id', 3
put 'ORDERS', 10012,'cf1:order_ref','ref#012'
put 'ORDERS', 10012,'cf1:date','2018-08-25 08:11:00'
由于环境中的spark sql是通过hive的metadata来操作表项的,所以创建hive的外部表去映射hbase表项
CREATE EXTERNAL TABLE ORDERS(id int,customer_id int,order_ref string,`date` string)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf1:customer_id,cf1:order_ref,cf1:date")
TBLPROPERTIES ("hbase.table.name" = "ORDERS");
在spark执行查询语句如下
spark-sql> SELECT *
> FROM
> (
> SELECT t.*,
> ROW_NUMBER() over(partition by customer_id
> order by date DESC) RowNum
> FROM ORDERS t
> ) tbl
> WHERE RowNum <= 2;
10004 1 ref#004 2018-08-27 02:06:20 1
10003 1 ref#003 2018-08-27 01:05:00 2
10011 3 ref#011 2018-08-26 16:00:00 1
10010 3 ref#010 2018-08-26 15:07:00 2
10008 2 ref#008 2018-08-27 13:11:00 1
10005 2 ref#005 2018-08-27 02:10:00 2
Time taken: 10.423 seconds, Fetched 6 row(s)