获取SQL分组中的第1条或topN条记录

以mysql为例
创建表项

create table orders (
     id INT NOT NULL AUTO_INCREMENT,
         customer_id INT,
         order_ref VARCHAR(127),
         date timestamp,
         PRIMARY KEY (id)
 );

插入测试记录

insert into orders(customer_id,order_ref,date) values(1,'ref#001','2018-08-27 01:02:21')
insert into orders(customer_id,order_ref,date) values(1,'ref#002','2018-08-27 01:03:50')
insert into orders(customer_id,order_ref,date) values(1,'ref#003','2018-08-27 01:05:00')
insert into orders(customer_id,order_ref,date) values(1,'ref#004','2018-08-27 02:06:20')
insert into orders(customer_id,order_ref,date) values(2,'ref#005','2018-08-27 02:10:00')
insert into orders(customer_id,order_ref,date) values(2,'ref#006','2018-08-26 14:10:00')
insert into orders(customer_id,order_ref,date) values(2,'ref#007','2018-08-26 02:10:00')
insert into orders(customer_id,order_ref,date) values(2,'ref#008','2018-08-27 13:11:00')
insert into orders(customer_id,order_ref,date) values(3,'ref#009','2018-08-25 02:10:00')
insert into orders(customer_id,order_ref,date) values(3,'ref#010','2018-08-26 15:07:00')
insert into orders(customer_id,order_ref,date) values(3,'ref#011','2018-08-26 16:00:00')
insert into orders(customer_id,order_ref,date) values(3,'ref#012','2018-08-25 08:11:00')

查询数据库中记录,如下

id cutomer_id order_ref date
1 1 ref#001 2018-08-27 01:02:21
2 1 ref#002 2018-08-27 01:03:50
3 1 ref#003 2018-08-27 01:05:00
4 1 ref#004 2018-08-27 02:06:20
5 2 ref#005 2018-08-27 02:10:00
6 2 ref#006 2018-08-26 14:10:00
7 2 ref#007 2018-08-26 02:10:00
8 2 ref#008 2018-08-27 13:11:00
9 3 ref#009 2018-08-25 02:10:00
10 3 ref#010 2018-08-26 15:07:00
11 3 ref#011 2018-08-26 16:00:00
12 3 ref#012 2018-08-25 08:11:00

需求1:

按照custom_id进行分组,且每组按照date排序,然后获取每组中时间最新的一条记录

结果数据如下

id cutomer_id order_ref date
4 1 ref#004 2018-08-27 02:06:20
8 2 ref#008 2018-08-27 13:11:00
11 3 ref#011 2018-08-26 16:00:00

子查询方式:

SELECT
    * 
FROM
    orders a 
WHERE
    NOT EXISTS ( SELECT 1 FROM orders WHERE a.customer_id = orders.customer_id AND a.date < orders.date );

如果用来分组的字段值不是很多,也可以用如下union all的方式

SELECT
  * 
FROM
  orders 
WHERE
  customer_id=[the_customer_id] 
ORDER BY
  date DESC
LIMIT 1
UNION ALL
SELECT
  * 
FROM
  orders 
WHERE
  customer_id=[the_customer_id] 
ORDER BY
  date DESC
LIMIT 1
......

需求2:

按照custom_id进行分组,且每组按照date排序,然后获取每组中时间最新的n条记录

结果数据如下

id cutomer_id order_ref date
4 1 ref#004 2018-08-27 02:06:20
3 1 ref#003 2018-08-27 01:05:00
8 2 ref#008 2018-08-27 13:11:00
5 2 ref#005 2018-08-27 02:10:00
11 3 ref#011 2018-08-26 16:00:00
10 3 ref#010 2018-08-26 15:07:00

子查询方式:

SELECT * 
FROM orders a WHERE 2 > ( SELECT count( * ) FROM orders WHERE customer_id = a.customer_id AND date > a.date ) 
ORDER BY
    a.customer_id,
    a.date DESC

用户变量方式:
这种方式类似于模拟了开窗函数(mysql8.0开始支持)

SELECT
    * 
FROM
    (SELECT
            customer_id,
            order_ref,
            date,
            @rn:=CASE
                WHEN @var_customer_id = customer_id THEN @rn + 1
                ELSE 1
            END AS rn,
            @var_customer_id:=customer_id
    FROM
        (SELECT @var_customer_id:=NULL, @rn:=NULL) vars, orders
    ORDER BY customer_id , date DESC) as sub_table
WHERE
    rn <= 2
ORDER BY customer_id , date DESC

目前的spark sql是支持开窗函数特性的,下面以spark sql操作hbase表为例,描述下在spark sql中进行分组查询top n记录的过程

在hbase shell中创建表并插入记录

create 'ORDERS',  {NAME=>'cf1'}
put 'ORDERS', 10001,'cf1:customer_id', 1
put 'ORDERS', 10001,'cf1:order_ref','ref#001'
put 'ORDERS', 10001,'cf1:date','2018-08-27 01:03:50'
put 'ORDERS', 10002,'cf1:customer_id', 1
put 'ORDERS', 10002,'cf1:order_ref','ref#002'
put 'ORDERS', 10002,'cf1:date','2018-08-27 01:03:50'
put 'ORDERS', 10003,'cf1:customer_id', 1
put 'ORDERS', 10003,'cf1:order_ref','ref#003'
put 'ORDERS', 10003,'cf1:date','2018-08-27 01:05:00'
put 'ORDERS', 10004,'cf1:customer_id', 1
put 'ORDERS', 10004,'cf1:order_ref','ref#004'
put 'ORDERS', 10004,'cf1:date','2018-08-27 02:06:20'
put 'ORDERS', 10005,'cf1:customer_id', 2
put 'ORDERS', 10005,'cf1:order_ref','ref#005'
put 'ORDERS', 10005,'cf1:date','2018-08-27 02:10:00'
put 'ORDERS', 10006,'cf1:customer_id', 2
put 'ORDERS', 10006,'cf1:order_ref','ref#006'
put 'ORDERS', 10006,'cf1:date','2018-08-26 14:10:00'
put 'ORDERS', 10007,'cf1:customer_id', 2
put 'ORDERS', 10007,'cf1:order_ref','ref#007'
put 'ORDERS', 10007,'cf1:date','2018-08-26 02:10:00'
put 'ORDERS', 10008,'cf1:customer_id', 2
put 'ORDERS', 10008,'cf1:order_ref','ref#008'
put 'ORDERS', 10008,'cf1:date','2018-08-27 13:11:00'
put 'ORDERS', 10009,'cf1:customer_id', 3
put 'ORDERS', 10009,'cf1:order_ref','ref#009'
put 'ORDERS', 10009,'cf1:date','2018-08-25 02:10:00'
put 'ORDERS', 10010,'cf1:customer_id', 3
put 'ORDERS', 10010,'cf1:order_ref','ref#010'
put 'ORDERS', 10010,'cf1:date','2018-08-26 15:07:00'
put 'ORDERS', 10011,'cf1:customer_id', 3
put 'ORDERS', 10011,'cf1:order_ref','ref#011'
put 'ORDERS', 10011,'cf1:date','2018-08-26 16:00:00'
put 'ORDERS', 10012,'cf1:customer_id', 3
put 'ORDERS', 10012,'cf1:order_ref','ref#012'
put 'ORDERS', 10012,'cf1:date','2018-08-25 08:11:00'

由于环境中的spark sql是通过hive的metadata来操作表项的,所以创建hive的外部表去映射hbase表项

CREATE EXTERNAL TABLE ORDERS(id int,customer_id int,order_ref string,`date` string)   
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'   
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf1:customer_id,cf1:order_ref,cf1:date")   
TBLPROPERTIES ("hbase.table.name" = "ORDERS");

在spark执行查询语句如下

spark-sql> SELECT *
         >  FROM 
         >  (
         >    SELECT t.*,
         >           ROW_NUMBER() over(partition by customer_id 
         >                             order by     date DESC) RowNum
         >    FROM ORDERS t
         >  ) tbl
         >  WHERE RowNum <= 2;
10004   1   ref#004 2018-08-27 02:06:20 1
10003   1   ref#003 2018-08-27 01:05:00 2
10011   3   ref#011 2018-08-26 16:00:00 1
10010   3   ref#010 2018-08-26 15:07:00 2
10008   2   ref#008 2018-08-27 13:11:00 1
10005   2   ref#005 2018-08-27 02:10:00 2
Time taken: 10.423 seconds, Fetched 6 row(s)
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。