Skip to content

Commit 98eca8a

Browse files
committed
add code remark
1 parent 6ef82b0 commit 98eca8a

20 files changed

+240
-128
lines changed

src/main/java/com/codingapi/dbstream/DBStreamContext.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package com.codingapi.dbstream;
22

3-
import com.codingapi.dbstream.interceptor.SQLRunningContext;
3+
import com.codingapi.dbstream.interceptor.SQLExecuteListenerContext;
44
import com.codingapi.dbstream.listener.SQLExecuteListener;
55
import com.codingapi.dbstream.provider.DBTableSupportProvider;
66
import com.codingapi.dbstream.provider.DefaultDBTableSupportProvider;
@@ -37,7 +37,7 @@ private DBStreamContext() {
3737
* @param sqlExecuteListener 订阅
3838
*/
3939
public void addListener(SQLExecuteListener sqlExecuteListener) {
40-
SQLRunningContext.getInstance().addListener(sqlExecuteListener);
40+
SQLExecuteListenerContext.getInstance().addListener(sqlExecuteListener);
4141
}
4242

4343
/**

src/main/java/com/codingapi/dbstream/driver/DBStreamProxyDriver.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package com.codingapi.dbstream.driver;
22

3-
import com.codingapi.dbstream.interceptor.SQLRunningContext;
3+
import com.codingapi.dbstream.interceptor.SQLExecuteListenerContext;
44
import com.codingapi.dbstream.listener.stream.SQLDeleteExecuteListener;
55
import com.codingapi.dbstream.listener.stream.SQLInsertExecuteListener;
66
import com.codingapi.dbstream.listener.stream.SQLUpdateExecuteListener;
@@ -30,9 +30,9 @@ public class DBStreamProxyDriver implements Driver {
3030
LOGGER.log(Level.SEVERE, "Failed to register DBStreamProxyDriver", e);
3131
throw new RuntimeException("Failed to register DBStreamProxyDriver", e);
3232
}
33-
SQLRunningContext.getInstance().addListener(new SQLDeleteExecuteListener());
34-
SQLRunningContext.getInstance().addListener(new SQLInsertExecuteListener());
35-
SQLRunningContext.getInstance().addListener(new SQLUpdateExecuteListener());
33+
SQLExecuteListenerContext.getInstance().addListener(new SQLDeleteExecuteListener());
34+
SQLExecuteListenerContext.getInstance().addListener(new SQLInsertExecuteListener());
35+
SQLExecuteListenerContext.getInstance().addListener(new SQLUpdateExecuteListener());
3636
LOGGER.info("DBStreamProxyDriver initialized and registered");
3737
}
3838

src/main/java/com/codingapi/dbstream/interceptor/SQLRunningContext.java renamed to src/main/java/com/codingapi/dbstream/interceptor/SQLExecuteListenerContext.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,24 @@
88
import java.util.List;
99
import java.util.concurrent.CopyOnWriteArrayList;
1010

11-
public class SQLRunningContext {
11+
/**
12+
* SQLExecuteListener 上下文对象
13+
*/
14+
public class SQLExecuteListenerContext {
1215

1316
@Getter
14-
private final static SQLRunningContext instance = new SQLRunningContext();
17+
private final static SQLExecuteListenerContext instance = new SQLExecuteListenerContext();
1518

1619
@Getter
1720
private final List<SQLExecuteListener> listeners = new CopyOnWriteArrayList<>();
1821

19-
private SQLRunningContext() {
22+
private SQLExecuteListenerContext() {
2023

2124
}
2225

26+
/**
27+
* 添加SQL执行监听器
28+
*/
2329
public void addListener(SQLExecuteListener listener) {
2430
if (listener != null) {
2531
listeners.add(listener);
@@ -29,6 +35,9 @@ public void addListener(SQLExecuteListener listener) {
2935
}
3036

3137

38+
/**
39+
* SQL执行后拦截
40+
*/
3241
public void after(SQLExecuteState executeState, Object result) throws SQLException {
3342
executeState.setResult(result);
3443
executeState.after();
@@ -38,6 +47,9 @@ public void after(SQLExecuteState executeState, Object result) throws SQLExcepti
3847
}
3948

4049

50+
/**
51+
* SQL执行前拦截
52+
*/
4153
public void before(SQLExecuteState executeState) throws SQLException {
4254
executeState.begin();
4355
for (SQLExecuteListener listener : listeners) {

src/main/java/com/codingapi/dbstream/interceptor/SQLExecuteState.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,7 @@ public String getJdbcKey() {
359359
*
360360
* @param tableName 表名
361361
*/
362-
public void updateMetaData(String tableName) throws SQLException {
362+
public void triggerDBMetaData(String tableName) throws SQLException {
363363
// 当前表需要更新时,将会连同所有带更新的表一次性全部更新
364364
if (this.metaData.isUpdateTableMeta(tableName)) {
365365
DBScanner dbScanner = new DBScanner(connectionProxy.getConnection(), getDriverProperties());

src/main/java/com/codingapi/dbstream/listener/stream/SQLStreamExecuteListener.java renamed to src/main/java/com/codingapi/dbstream/listener/stream/DBEventExecuteListener.java

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,40 +11,66 @@
1111
import java.sql.SQLException;
1212
import java.util.List;
1313

14-
public abstract class SQLStreamExecuteListener implements SQLExecuteListener {
14+
/**
15+
* 数据事件解析监听对象
16+
*/
17+
public abstract class DBEventExecuteListener implements SQLExecuteListener {
1518

16-
public abstract boolean isSupport(String sql);
19+
/**
20+
* SQL适配检测
21+
*/
22+
public abstract boolean support(String sql);
1723

24+
/**
25+
* 构建对应处理的SQL解析对象
26+
*/
1827
public abstract SQLParser createSQLParser(String sql);
1928

29+
/**
30+
* 构建对应处理的事件解析对象
31+
*/
2032
public abstract DBEventParser createDbEventParser(SQLExecuteState sqlExecuteState,SQLParser sqlParser,DbTable dbTable);
2133

2234
@Override
2335
public void before(SQLExecuteState executeState) throws SQLException {
2436
String sql = executeState.getSql();
25-
if (this.isSupport(sql)) {
37+
if (this.support(sql)) {
2638
try {
39+
// 清空执行历史
2740
ThreadLocalContext.getInstance().remove();
41+
// 获取SQL解析对象
2842
SQLParser sqlParser = this.createSQLParser(sql);
43+
// 提取表明
2944
String tableName = sqlParser.getTableName();
30-
executeState.updateMetaData(tableName);
45+
// 触发待更新的元数据表信息
46+
executeState.triggerDBMetaData(tableName);
47+
// 提取对应元数据表信息
3148
DbTable dbTable = executeState.getDbTable(tableName);
49+
// 判断是否支持对该表的DB事件支持
3250
if (dbTable != null && DBStreamContext.getInstance().support(executeState.getDriverProperties(), dbTable)) {
51+
// 是否批量模式判断
3352
if (executeState.isBatchMode()) {
53+
// 批量模式下,将获取批量的SQL执行结果数据
3454
List<SQLExecuteState> executeStateList = executeState.getBatchSQLExecuteStateList();
3555
for (int i = 0; i < executeStateList.size(); i++) {
3656
SQLExecuteState sqlExecuteState = executeStateList.get(i);
3757
DBEventParser dataParser = this.createDbEventParser(sqlExecuteState,sqlParser,dbTable);
58+
// DB事件解析前置
3859
dataParser.prepare();
60+
// 存储到本地线程
3961
ThreadLocalContext.getInstance().push(i, dataParser);
4062
}
4163
} else {
64+
// 非批量模式执行
4265
DBEventParser dataParser = this.createDbEventParser(executeState,sqlParser,dbTable);
66+
// DB事件解析前置
4367
dataParser.prepare();
68+
// 存储到本地线程
4469
ThreadLocalContext.getInstance().push(dataParser);
4570
}
4671
}
4772
} catch (Exception e) {
73+
// 异常清空缓存数据
4874
ThreadLocalContext.getInstance().remove();
4975
throw new SQLException(e);
5076
}
@@ -54,24 +80,31 @@ public void before(SQLExecuteState executeState) throws SQLException {
5480
@Override
5581
public void after(SQLExecuteState executeState, Object result) throws SQLException {
5682
String sql = executeState.getSql();
83+
// 获取事务标识信息
5784
String transactionKey = executeState.getTransactionKey();
58-
if (this.isSupport(sql)) {
85+
if (this.support(sql)) {
86+
// 批量模式
5987
if (executeState.isBatchMode()) {
6088
List<SQLExecuteState> executeStateList = executeState.getBatchSQLExecuteStateList();
6189
for (int i = 0; i < executeStateList.size(); i++) {
6290
DBEventParser dataParser = ThreadLocalContext.getInstance().get(i);
6391
if (dataParser != null) {
92+
// 获取DB事件信息
6493
List<DBEvent> eventList = dataParser.loadEvents(result);
6594
TransactionEventPools.getInstance().addEvents(transactionKey, eventList);
6695
}
6796
}
97+
// 清空本地缓存数据
6898
ThreadLocalContext.getInstance().remove();
6999
} else {
100+
// 非批量模式
70101
DBEventParser dataParser = ThreadLocalContext.getInstance().get();
71102
if (dataParser != null) {
103+
// 获取DB事件信息
72104
List<DBEvent> eventList = dataParser.loadEvents(result);
73105
TransactionEventPools.getInstance().addEvents(transactionKey, eventList);
74106
}
107+
// 清空本地缓存数据
75108
ThreadLocalContext.getInstance().remove();
76109
}
77110
}

src/main/java/com/codingapi/dbstream/listener/stream/SQLDeleteExecuteListener.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,15 @@
88
import com.codingapi.dbstream.scanner.DbTable;
99
import com.codingapi.dbstream.utils.SQLUtils;
1010

11-
public class SQLDeleteExecuteListener extends SQLStreamExecuteListener {
11+
public class SQLDeleteExecuteListener extends DBEventExecuteListener {
1212

1313
@Override
1414
public int order() {
1515
return 100;
1616
}
1717

1818
@Override
19-
public boolean isSupport(String sql) {
19+
public boolean support(String sql) {
2020
return SQLUtils.isDeleteSQL(sql);
2121
}
2222

src/main/java/com/codingapi/dbstream/listener/stream/SQLInsertExecuteListener.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,15 @@
88
import com.codingapi.dbstream.scanner.DbTable;
99
import com.codingapi.dbstream.utils.SQLUtils;
1010

11-
public class SQLInsertExecuteListener extends SQLStreamExecuteListener {
11+
public class SQLInsertExecuteListener extends DBEventExecuteListener {
1212

1313
@Override
1414
public int order() {
1515
return 100;
1616
}
1717

1818
@Override
19-
public boolean isSupport(String sql) {
19+
public boolean support(String sql) {
2020
return SQLUtils.isInsertSQL(sql);
2121
}
2222

src/main/java/com/codingapi/dbstream/listener/stream/SQLUpdateExecuteListener.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,15 @@
88
import com.codingapi.dbstream.scanner.DbTable;
99
import com.codingapi.dbstream.utils.SQLUtils;
1010

11-
public class SQLUpdateExecuteListener extends SQLStreamExecuteListener {
11+
public class SQLUpdateExecuteListener extends DBEventExecuteListener {
1212

1313
@Override
1414
public int order() {
1515
return 100;
1616
}
1717

1818
@Override
19-
public boolean isSupport(String sql) {
19+
public boolean support(String sql) {
2020
return SQLUtils.isUpdateSQL(sql);
2121
}
2222

src/main/java/com/codingapi/dbstream/listener/stream/ThreadLocalContext.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,14 @@
66
import java.util.Map;
77
import java.util.concurrent.ConcurrentHashMap;
88

9+
/**
10+
* SQLStreamExecuteListener 下的线程上下文操作对象
11+
*/
912
class ThreadLocalContext {
1013

14+
/**
15+
* 缓存的数据,key 为index对应batch数据的顺序 value为 相应的事件解析对象
16+
*/
1117
private final Map<Integer, DBEventParser> cache;
1218

1319
private final ThreadLocal<ThreadLocalContext> threadLocal = new ThreadLocal<>();

src/main/java/com/codingapi/dbstream/parser/DBEventParser.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,20 @@
55
import java.sql.SQLException;
66
import java.util.List;
77

8+
/**
9+
* DBEvent 事件解析
10+
*/
811
public interface DBEventParser {
912

13+
/**
14+
* 预先处理
15+
*/
1016
void prepare() throws SQLException;
1117

18+
/**
19+
* 加载DBEvent事件数据
20+
* @param result sql执行结果返回值
21+
* @return List<DBEvent>
22+
*/
1223
List<DBEvent> loadEvents(Object result) throws SQLException;
1324
}

0 commit comments

Comments
 (0)