多个Reducer操作同一张表可能出现的错误
假设有两个Reducer,R1和R2,都需要操作MySQL的Dimension表(封装成getDimensionId()
方法):首先查询该表中是否存在某一维度;如果该维度存在,则返回该维度的id值(id值为自增主键);如果该维度不存在,则先将该维度数据插入Dimension表,再返回该维度的id值。
在大数据量、频繁读取同一张表的情况下,R1、R2同时调用getDimensionId()方法,可能会出现以下错误:Dimension表中有可能存在两个id不同、但维度相同的数据。而我们需要的是:相同的维度数据只能出现一次。
解决办法:自定Hadoop RPC服务。在服务器端定义getDimensionId()方法,让R1和R2远程调用。
项目结构
接口定义
- 必须继承自VersionedProtocol
- 必须拥有
versionID
属性,且名称不能改变 - 定义getDimensionId()方法
/**
* 提供专门操作dimension表的接口
* 必须继承自VersionedProtocol
*
* @author liangxw
*/
public interface IDimensionHandler extends VersionedProtocol {
// 版本id,属性名称不能改变
long versionID = 1;
/**
* 根据dimension的value值获取id
* 如果数据库中有,那么直接返回。如果没有,那么进行插入后返回新的id值
*/
int getDimensionId(Dimension dimension) throws IOException;
}
服务器端
接口实现
- 实现了getDimensionId()方法
- 使用
LinkedHashMap
实现了服务器端缓存,提高读取效率
/**
* 实现了IDimensionHandler
*/
public class DimensionHandlerImpl implements IDimensionHandler {
private static final Logger logger = LoggerFactory.getLogger(DimensionHandlerImpl.class);
private ThreadLocal<Connection> threadLocal = new ThreadLocal<>();
// 用于服务器端保存维度和对应的id值
// 超过5000条时,删除旧条目
private Map<String, Integer> dimensionIdCache = new LinkedHashMap<String, Integer>() {
private static final long serialVersionUID = -8113529501777031499L;
private static final int MAX_ENTRIES = 5000;
@Override
protected boolean removeEldestEntry(Entry<String, Integer> eldest) {
// 缓存容量, 如果这里返回true,那么删除最早加入的数据
return this.size() > MAX_ENTRIES;
}
};
@Override
public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
return IDimensionHandler.versionID;
}
@Override
public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash)
throws IOException {
// 返回空即可
return null;
}
@Override
public int getDimensionId(Dimension dimension) throws IOException {
// 将维度转换为字符串(维度的另一种描述方式)
String dimensionString = buildDimensionString(dimension);
// 查看服务器端sqlCacheMap缓存中是否有该维度对应的id值,有则返回
if (this.dimensionIdCache.containsKey(dimensionString)) {
return this.dimensionIdCache.get(dimensionString);
}
// 如果服务器端dimensionStringCache缓存中没有该维度的值
Connection conn;
try {
// 存放具体执行sql语句的数组
String[] sql;
if (dimension instanceof DateD) {
sql = this.buildDateSql();
} else if (dimension instanceof PlatformD) {
sql = this.buildPlatformSql();
} else if (dimension instanceof BrowserD) {
sql = this.buildBrowserSql();
} else {
throw new IOException("不支持此dimensionid的获取:" + dimension.getClass());
}
// 获取数据库连接
conn = this.getConnection();
int id;
synchronized (this) {
// 执行sql语句,获得id值
id = this.executeSql(conn, sql, dimension);
}
// 将该dimension和id值在服务器端进行缓存
this.dimensionIdCache.put(dimensionString, id);
return id;
} catch (Throwable e) {
logger.error("操作数据库出现异常", e);
throw new IOException(e);
}
}
/**
* 将dimension转换为字符串
* 就是简单的字符串拼接
*/
public static String buildDimensionString(Dimension dimension) {
StringBuilder sb = new StringBuilder();
if (dimension instanceof DateD) {
DateD date = (DateD) dimension;
sb.append("date_dimension")
.append(date.getYear())
.append(date.getSeason())
.append(date.getMonth())
.append(date.getWeek())
.append(date.getDay())
.append(date.getType());
} else if (dimension instanceof PlatformD) {
PlatformD platform = (PlatformD) dimension;
sb.append("platform_dimension")
.append(platform.getPlatformName())
.append(platform.getPlatformVersion());
} else if (dimension instanceof BrowserD) {
BrowserD browser = (BrowserD) dimension;
sb.append("browser_dimension")
.append(browser.getBrowser())
.append(browser.getBrowserVersion());
}
if (sb.length() == 0) {
throw new RuntimeException("无法将指定dimension转换为字符串:" + dimension.getClass());
}
return sb.toString();
}
/**
* 创建date dimension相关sql
*/
private String[] buildDateSql() {
String querySql = "SELECT `id` FROM `dimension_date` WHERE `year` = ? AND `season` = ? AND `month` = ? AND `week` = ? AND `day` = ? AND `type` = ? AND `calendar` = ?";
String insertSql = "INSERT INTO `dimension_date`(`year`, `season`, `month`, `week`, `day`, `type`, `calendar`) VALUES(?, ?, ?, ?, ?, ?, ?)";
return new String[]{querySql, insertSql};
}
/**
* 创建polatform dimension相关sql
*/
private String[] buildPlatformSql() {
String querySql = "SELECT `id` FROM `dimension_platform` WHERE `platform_name` = ? AND `platform_version` = ?";
String insertSql = "INSERT INTO `dimension_platform`(`platform_name`, `platform_version`) VALUES(?, ?)";
return new String[]{querySql, insertSql};
}
/**
* 创建browser dimension相关sql
*/
private String[] buildBrowserSql() {
String querySql = "SELECT `id` FROM `dimension_browser` WHERE `browser_name` = ? AND `browser_version` = ?";
String insertSql = "INSERT INTO `dimension_browser`(`browser_name`, `browser_version`) VALUES(?, ?)";
return new String[]{querySql, insertSql};
}
/**
* 连接数据库
*/
private Connection getConnection() throws SQLException {
Connection conn;
synchronized (this) {
conn = threadLocal.get();
try {
if (conn == null || conn.isClosed() || !conn.isValid(3)) {
conn = JdbcManager.getConnection(MYSQL_DATABASE);
}
} catch (SQLException e) {
try {
if (conn != null)
conn.close();
} catch (SQLException e1) {
// nothings
}
conn = JdbcManager.getConnection(MYSQL_DATABASE);
}
this.threadLocal.set(conn);
}
return conn;
}
/**
* 具体执行sql的方法
*/
@SuppressWarnings("resource")
private int executeSql(Connection conn, String[] sqls, Dimension dimension)
throws SQLException {
PreparedStatement pstmt = null;
ResultSet rs = null;
try {
// 从数据库中查询dimension的id值
pstmt = conn.prepareStatement(sqls[0]); // 创建查询sql的pstmt对象
// 设置参数
this.setArgs(pstmt, dimension);
rs = pstmt.executeQuery();
if (rs.next()) {
// 查询到即返回
return rs.getInt(1);
}
// 如果该dimension在数据库中不存在,则首先插入该dimension
// 第二个参数表示是否返回自增长的主键的id
pstmt = conn.prepareStatement(sqls[1], Statement.RETURN_GENERATED_KEYS);
// 设置参数
this.setArgs(pstmt, dimension);
pstmt.executeUpdate();
// 获取返回的自增长的id
rs = pstmt.getGeneratedKeys();
if (rs.next()) {
return rs.getInt(1); // 获取返回值
}
} finally {
if (rs != null) {
try {
rs.close();
} catch (Throwable e) {
// nothing
}
}
if (pstmt != null) {
try {
pstmt.close();
} catch (Throwable e) {
// nothing
}
}
}
throw new RuntimeException("从数据库获取id失败");
}
/**
* 设置参数
*/
private void setArgs(PreparedStatement pstmt, Dimension dimension) throws SQLException {
int i = 0;
if (dimension instanceof DateD) {
DateD date = (DateD) dimension;
pstmt.setInt(++i, date.getYear());
pstmt.setInt(++i, date.getSeason());
pstmt.setInt(++i, date.getMonth());
pstmt.setInt(++i, date.getWeek());
pstmt.setInt(++i, date.getDay());
pstmt.setString(++i, date.getType());
pstmt.setDate(++i, new Date(date.getCalendar().getTime()));
} else if (dimension instanceof PlatformD) {
PlatformD platform = (PlatformD) dimension;
pstmt.setString(++i, platform.getPlatformName());
pstmt.setString(++i, platform.getPlatformVersion());
} else if (dimension instanceof BrowserD) {
BrowserD browser = (BrowserD) dimension;
pstmt.setString(++i, browser.getBrowser());
pstmt.setString(++i, browser.getBrowserVersion());
}
}
}
接口启动
- 将配置信息保存在HDFS上
客户端从HDFS上读取配置信息 - 添加关闭操作的钩子
/**
* IDimensionConverter服务接口的启动类
*
* @author liangxw
*/
public class DimensionHandlerServer {
private static final Logger logger = Logger.getLogger(DimensionHandlerServer.class);
private AtomicBoolean isRunning = new AtomicBoolean(false); // 标识是否启动
private Server server = null; // 服务对象
private Configuration conf = null;
// 保存在hdfs上的配置文件
private static final String CONFIG_SAVE_PATH = "/user/liangxw/rpc/config";
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://bigdata.liangxw.com:9000");
conf.set("hbase.zookeeper.quorum", "bigdata.liangxw.com:2181");
DimensionHandlerServer dhs = new DimensionHandlerServer(conf);
dhs.startServer();
}
// 添加一个钩子,进行关闭操作
private DimensionHandlerServer(Configuration conf) {
this.conf = conf;
Runtime.getRuntime().addShutdownHook(
new Thread(new Runnable() {
@Override
public void run() {
try {
DimensionHandlerServer.this.stopServer();
} catch (IOException e) {
// nothing
}
}
}));
}
/**
* 关闭服务
*/
private void stopServer() throws IOException {
logger.info("关闭服务开始");
try {
// 首先移除配置文件
this.removeRPCConf();
logger.info("删除配置文件");
} finally {
if (this.server != null) {
Server tmp = this.server;
this.server = null;
tmp.stop();
}
}
logger.info("关闭服务结束");
}
/**
* 启动服务
*/
private void startServer() {
logger.info("开始启动服务");
synchronized (this) {
if (isRunning.get()) {
// 启动完成
return;
}
try {
// 创建一个对象
IDimensionHandler converter = new DimensionHandlerImpl();
// 创建服务
this.server = new RPC.Builder(conf)
.setInstance(converter)
.setProtocol(IDimensionHandler.class)
.setVerbose(true)
.build();
// 获取ip地址和端口号
int port = this.server.getPort();
String address = InetAddress.getLocalHost().getHostAddress();
// 保存ip地址和端口号
this.saveRPCConf(address, port);
// 启动
this.server.start();
// 标识成功
isRunning.set(true);
logger.info("启动服务成功,监听ip地址:" + address + ",端口:" + port);
// this.server.stop();
} catch (Throwable e) {
isRunning.set(false);
logger.error("启动服务发生异常", e);
// 关闭可能异常创建的服务
try {
this.stopServer();
} catch (Throwable ee) {
// nothing
}
throw new RuntimeException("启动服务发生异常", e);
}
}
}
/**
* 保存监听信息
*/
private void saveRPCConf(String address, int port) throws IOException {
// 删除已经存在的
this.removeRPCConf();
// 进行数据输出操作
FileSystem fs = null;
BufferedWriter bw = null;
try {
fs = FileSystem.get(conf);
Path path = new Path(CONFIG_SAVE_PATH);
bw = new BufferedWriter(new OutputStreamWriter(fs.create(path)));
bw.write(address);
bw.newLine();
bw.write(String.valueOf(port));
} finally {
if (bw != null) {
try {
bw.close();
} catch (IOException e) {
// nothing
}
}
if (fs != null) {
try {
fs.close();
} catch (IOException e) {
// nothing
}
}
}
}
/**
* 删除监听信息
*/
private void removeRPCConf() throws IOException {
FileSystem fs = null;
try {
fs = FileSystem.get(conf);
Path path = new Path(CONFIG_SAVE_PATH);
if (fs.exists(path)) {
// 存在,则删除
fs.delete(path, true);
}
} finally {
if (fs != null) {
try {
fs.close();
} catch (IOException e) {
// nothing
}
}
}
}
}
客户端
创建内部代理类,增加客户端缓存功能(客户端缓存中查询不到时,再去服务器端查询)
/**
* 操作dimensionConverter相关服务的client端工具类
*
* @author liangxw
*/
public class DimensionHandlerClient {
/**
* 创建连接对象
*/
public static IDimensionHandler createDimensionConnector(Configuration conf)
throws IOException {
// 读取配置文件
String[] serverConf = getDimensionServerConf(conf);
// 获取ip和端口号
String serverIp = serverConf[0]; // 获取ip地址
int serverPort = Integer.valueOf(serverConf[1]); // 获取端口号
// 创建代理对象
return new InnerDimensionHandlerProxy(conf, serverIp, serverPort);
}
/**
* 从hdfs文件中读取配置信息,ip地址和端口号
*/
private static String[] getDimensionServerConf(Configuration conf) throws IOException {
FileSystem fs;
BufferedReader br = null;
try {
fs = FileSystem.get(conf);
br = new BufferedReader(new InputStreamReader(fs.open(new Path(CONFIG_SAVE_PATH))));
String[] serverConf = new String[2];
serverConf[0] = br.readLine().trim(); // ip地址
serverConf[1] = br.readLine().trim(); // 端口号
return serverConf;
} finally {
if (br != null) {
try {
br.close();
} catch (Exception ee) {
// nothing
}
}
// 默认配置参数的情况下,这里不要调用fs.close()方法,因为可能fs这个对象在多个线程中公用
}
}
/**
* 关闭客户端连接
*/
public static void stopDimensionHandlerProxy(IDimensionHandler proxy) {
if (proxy != null) {
InnerDimensionHandlerProxy innerProxy = (InnerDimensionHandlerProxy) proxy;
RPC.stopProxy(innerProxy.proxy);
}
}
/**
* 内部代理类
* 增加缓存在本地磁盘的功能
*/
private static class InnerDimensionHandlerProxy implements IDimensionHandler {
// 远程连接代理对象
private IDimensionHandler proxy = null;
// 本地缓存dimension和对应的id
// 最多缓存1000条记录
private Map<String, Integer> dimensionIdCache = new LinkedHashMap<String, Integer>() {
private static final long serialVersionUID = -731083744087467205L;
@Override
protected boolean removeEldestEntry(Map.Entry<String, Integer> eldest) {
return this.size() > 1000;
}
};
/**
* 构造函数,创建代理对象
*/
InnerDimensionHandlerProxy(Configuration conf, String address, int port)
throws IOException {
this.proxy = RPC.getProxy(
IDimensionHandler.class,
IDimensionHandler.versionID,
new InetSocketAddress(address, port),
conf
);
}
@Override
public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
return this.proxy.getProtocolVersion(protocol, clientVersion);
}
@Override
public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash)
throws IOException {
return this.proxy.getProtocolSignature(protocol, clientVersion, clientMethodsHash);
}
@Override
public int getDimensionId(Dimension dimension) throws IOException {
// 创建cache的key值
String key = DimensionHandlerImpl.buildDimensionString(dimension);
// 首先从本地缓存中获取id值
Integer value = this.dimensionIdCache.get(key);
if (value == null) {
// 本地没有,则在服务器端进行获取
value = this.proxy.getDimensionId(dimension);
this.dimensionIdCache.put(key, value);
}
return value;
}
}
}