有效数据生成以及插入数据库方案
先产生insert数据并存到备份文件中
因为有效数据生成的数量不大, 按照压测那边给我的需求大概每个交易4千笔数据左右,需要并发量比较高的交易也不过4万笔数据,涉及到转账的交易数据量比较高一点。并且需要做一个备份为了以后压测可以备用,因此我选择先生成到不同的表对应的表名文件中,然后再写一个批量执行SQL的程序执行这些文件中的insert语句
下面的代码做了脱敏处理:
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class MultiThreadScript {
private static final int THREAD_POOL_SIZE = 4; // 线程池大小
private static final String INPUT_FILE_PATH = "input.sql"; // 输入文件路径
private static final String OUTPUT_FILE_PATH = "output.sql"; // 输出文件路径
private static final String INSERT_REGEX = "(?i)^insert into .* values\\s*\\((.*)\\);?$"; // insert语句的正则表达式
private static final String PK_REGEX = "'[0-9A-Za-z]+'"; // 主键的正则表达式
private static final int PK_INDEX = 0; // 主键在值列表中的索引
public static void main(String[] args) throws Exception {
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
try (BufferedReader reader = new BufferedReader(new FileReader(INPUT_FILE_PATH))) {
String line;
while ((line = reader.readLine()) != null) {
if (isInsertStatement(line)) {
executor.execute(new InsertTask(line));
}
}
}
// 关闭线程池并等待所有任务完成
executor.shutdown();
executor.awaitTermination(1, TimeUnit.HOURS);
}
// 判断一行文本是否为insert语句
private static boolean isInsertStatement(String line) {
return line.matches(INSERT_REGEX);
}
// 插入任务
private static class InsertTask implements Runnable {
private final String originalSql;
public InsertTask(String originalSql) {
this.originalSql = originalSql;
}
@Override
public void run() {
try {
// 提取主键
Pattern pkPattern = Pattern.compile(PK_REGEX);
Matcher pkMatcher = pkPattern.matcher(originalSql);
pkMatcher.find();
String originalPk = pkMatcher.group();
// 提取值列表
String valueList = originalSql.replaceAll(INSERT_REGEX, "$1");
String[] values = valueList.split(",");
// 递增主键并生成新的SQL语句
StringBuilder newSqlBuilder = new StringBuilder();
for (int i = 0; i < 40000; i++) {
String newPk = getNextPk(originalPk);
String newValueList = valueList.replace(originalPk, newPk);
String newSql = originalSql.replaceAll(valueList, newValueList);
newSqlBuilder.append(newSql).append("\n");
}
// 写入输出文件
synchronized (MultiThreadScript.class) {
try (BufferedWriter writer = new BufferedWriter(new FileWriter(OUTPUT_FILE_PATH, true))) {
writer.write(newSqlBuilder.toString());
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
// 获取下一个主键
private String getNextPk (String originalPk) {
String prefix = originalPk.substring(0, originalPk.length() - 1);
String suffix = originalPk.substring(originalPk.length() - 1);
String newSuffix = getNextSuffix(suffix);
return prefix + newSuffix;
}
// 获取下一个主键后缀
private String getNextSuffix(String suffix) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < suffix.length(); i++) {
char c = suffix.charAt(i);
if (Character.isDigit(c)) {
int digit = Character.getNumericValue(c);
if (digit == 9) {
sb.append('A');
} else if (digit == 35) {
sb.append('a');
} else {
sb.append(Character.forDigit(digit + 1, 36));
}
} else if (Character.isLetter(c)) {
if (c == 'Z') {
sb.append('0');
} else if (c == 'z') {
sb.append('0');
} else {
sb.append((char) (c + 1));
}
} else {
sb.append(c);
}
}
return sb.toString();
}
}
上面的代码中,MultiThreadScript类是脚本的主类,它负责读取输入文件并创建线程池来处理每条insert语句。InsertTask类是插入任务类,它实现了Runnable接口,用于递增主键并生成新的SQL语句。为了避免多个线程同时写入输出文件,InsertTask类中使用了synchronized关键字来进行同步。
在getNextPk()方法中,我使用了类似于Excel中列名的递增方式来递增主键。首先,将原始主键分为前缀和后缀两部分,其中前缀是主键的前面部分,后缀是主键的最后一位字符。然后,对后缀进行递增,并根据递增后的后缀重新生成新的主键。
最后,需要注意的是,由于主键可能包含字母和数字,因此使用36进制来对主键进行递增。例如,对于主键值为"001",它的下一个值为"002";对于主键值为"AZ9",它的下一个值为"BA0"。
进行插入操作(脱敏处理后的代码):
import java.io.BufferedReader;
import java.io.FileReader;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class InsertExecutor {
private static final String URL = "jdbc:mysql://localhost:3306/mydatabase";
private static final String USER = "myuser";
private static final String PASSWORD = "mypassword";
private static final int THREAD_POOL_SIZE = 10;
public static void main(String[] args) {
try {
// 读取insert语句文件
BufferedReader reader = new BufferedReader(new FileReader("inserts.sql"));
String line;
Queue<String> inserts = new LinkedList<>();
while ((line = reader.readLine()) != null) {
inserts.add(line);
}
reader.close();
// 创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
// 执行insert语句
while (!inserts.isEmpty()) {
String insert = inserts.poll();
executorService.execute(new InsertWorker(insert));
}
// 关闭线程池
executorService.shutdown();
} catch (IOException e) {
e.printStackTrace();
}
}
static class InsertWorker implements Runnable {
private String insert;
public InsertWorker(String insert) {
this.insert = insert;
}
@Override
public void run() {
try (Connection conn = DriverManager.getConnection(URL, USER, PASSWORD);
PreparedStatement statement = conn.prepareStatement(insert)) {
// 执行insert语句
statement.executeUpdate();
} catch (SQLException e) {
// 主键冲突,跳过该语句
if (e.getErrorCode() == 1062) {
System.out.println("Skip duplicate insert: " + insert);
} else {
e.printStackTrace();
}
}
}
}
}
上面代码中我们把insert.sql取代为我们想要进行批量insert的sql文件即可,线程数量可根据CPU的情况来看,在不进行其他工作任务的情况下,可尽量压榨CPU的使用率以达到最高的效率。
亿级别的无效数据生成并插入数据库方案
这里因为涉及到的数据量特别大, 一般是模拟生产环境,因此一张表可能有千万级别以及亿级别的数据量,因此我选择一边生成一边做insert操作。也就是一个生产者一个消费者,当然,这里都是多线程来操作的。一开始我是每次达到20个事务一次提交的,后来换了OceanBase后,只能一次提交一个事务了,效率也变满了一点点. 对于多线程插入数据库,将生成的SQL语句分配给多个线程,每个线程使用单独的数据库连接插入数据库,可以使用线程池来管理多个线程
下面是脱敏后的代码:
import java.io.BufferedReader;
import java.io.FileReader;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class MultiThreadedSqlInsert {
// 数据库连接信息
private static final String DB_URL = "jdbc:mysql://localhost:3306/mydb";
private static final String DB_USER = "root";
private static final String DB_PASSWORD = "mypassword";
// 主键列名和初始值
private static final String PK_COLUMN_NAME = "id";
private static final String PK_INITIAL_VALUE = "1000";
// 线程数和每个线程处理的主键值个数
private static final int THREAD_COUNT = 10;
private static final int KEYS_PER_THREAD = 10000000;
// 文件名和队列大小
private static final String FILE_NAME = "data.sql";
private static final int QUEUE_SIZE = 10000;
public static void main(String[] args) throws Exception {
// 读取文件中的 SQL 语句
String sql = readSqlFromFile(FILE_NAME);
// 创建线程池和队列
ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
BlockingQueue<String> queue = new LinkedBlockingQueue<>(QUEUE_SIZE);
// 创建多个线程,为每个线程分配一段主键值的区间
for (int i = 0; i < THREAD_COUNT; i++) {
int start = i * KEYS_PER_THREAD;
int end = (i + 1) * KEYS_PER_THREAD - 1;
executor.submit(new SqlGenerator(sql, start, end, queue));
}
// 创建多个数据库连接,为每个连接分配一个线程
for (int i = 0; i < THREAD_COUNT; i++) {
Connection conn = DriverManager.getConnection(DB_URL, DB_USER, DB_PASSWORD);
executor.submit(new SqlExecutor(conn, queue));
}
// 等待所有线程执行完毕
executor.shutdown();
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
}
// 从文件中读取 SQL 语句
private static String readSqlFromFile(String fileName) throws Exception {
try (BufferedReader reader = new BufferedReader(new FileReader(fileName))) {
StringBuilder sb = new StringBuilder();
String line;
while ((line = reader.readLine()) != null) {
sb.append(line).append("\n");
} return sb.toString();
}
}
// 生成新的 SQL 语句
private static String generateSql(String sql, int key) {
String pkValue = PK_INITIAL_VALUE + key;
return sql.replaceFirst(PK_COLUMN_NAME, pkValue);
}
// 生成新的 SQL 语句的线程
private static class SqlGenerator implements Runnable {
private final String sql;
private final int start;
private final int end;
private final BlockingQueue<String> queue;
public SqlGenerator(String sql, int start, int end, BlockingQueue<String> queue) {
this.sql = sql;
this.start = start;
this.end = end;
this.queue = queue;
}
@Override
public void run() {
for (int i = start; i <= end; i++) {
String newSql = generateSql(sql, i);
try {
queue.put(newSql);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
}
}
// 执行 SQL 语句的线程
private static class SqlExecutor implements Runnable {
private final Connection conn;
private final BlockingQueue<String> queue;
public SqlExecutor(Connection conn, BlockingQueue<String> queue) {
this.conn = conn;
this.queue = queue;
}
@Override
public void run() {
try (PreparedStatement stmt = conn.prepareStatement("")) {
while (true) {
String sql = queue.take();
if (sql == null) {
break;
}
stmt.addBatch(sql);
if (stmt.getBatchSize() >= 1000) {
stmt.executeBatch();
}
}
stmt.executeBatch();
} catch (SQLException | InterruptedException e) {
e.printStackTrace();
}
}
}
这里使用了两个线程池,一个用于生成新的 SQL 语句,一个用于执行 SQL 语句。生成 SQL 语句的线程将生成的 SQL 语句存储到一个线程安全的队列中,执行 SQL 语句的线程从队列中取出 SQL 语句并执行插入操作。程序使用了 JDBC 连接 MySQL 数据库,并使用了 PreparedStatement 批量执行 SQL 语句,以提高插入效率。
需要注意的是,为了避免多个线程同时操作数据库导致数据不一致的问题,每个线程使用了自己的数据库连接。此外,程序还使用了线程安全的队列和加锁机制来保证线程安全。
总结
以上就是我在工作中遇到的问题之一,做一个小结,用到了很多线程和线程池的地方,以及操作数据库相关的知识。