Skip to content

Commit 64af8c8

Browse files
committed
fix #2
1 parent 6c76d04 commit 64af8c8

25 files changed

+274
-281
lines changed

scripts/.gitignore

Lines changed: 0 additions & 1 deletion
This file was deleted.

scripts/db.yaml

Lines changed: 0 additions & 23 deletions
This file was deleted.

src/main/java/com/codingapi/dbstream/driver/DBStreamProxyDriver.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
package com.codingapi.dbstream.driver;
22

33
import com.codingapi.dbstream.interceptor.SQLRunningContext;
4-
import com.codingapi.dbstream.listener.SQLDeleteExecuteListener;
5-
import com.codingapi.dbstream.listener.SQLInsertExecuteListener;
6-
import com.codingapi.dbstream.listener.SQLUpdateExecuteListener;
4+
import com.codingapi.dbstream.listener.stream.SQLDeleteExecuteListener;
5+
import com.codingapi.dbstream.listener.stream.SQLInsertExecuteListener;
6+
import com.codingapi.dbstream.listener.stream.SQLUpdateExecuteListener;
77
import com.codingapi.dbstream.proxy.ConnectionProxy;
88
import com.codingapi.dbstream.scanner.DBMetaContext;
99
import com.codingapi.dbstream.scanner.DBMetaData;

src/main/java/com/codingapi/dbstream/interceptor/SQLExecuteState.java

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -201,22 +201,6 @@ public List<Object> getListParams() {
201201
return new ArrayList<>();
202202
}
203203

204-
/**
205-
* 获取执行的SQL队列
206-
* @return List
207-
*/
208-
public List<SQLExecuteParam> getBatchExecuteSQLParamList(){
209-
if(this.batchMode){
210-
if(this.sqlExecuteParams.isEmpty()){
211-
return new ArrayList<>();
212-
}
213-
int size = this.sqlExecuteParams.size();
214-
return this.sqlExecuteParams.subList(0,size-1);
215-
216-
}
217-
return new ArrayList<>();
218-
}
219-
220204

221205
/**
222206
* 获取Batch的SQLExecuteState

src/main/java/com/codingapi/dbstream/listener/SQLDeleteExecuteListener.java

Lines changed: 0 additions & 61 deletions
This file was deleted.

src/main/java/com/codingapi/dbstream/listener/SQLInsertExecuteListener.java

Lines changed: 0 additions & 60 deletions
This file was deleted.

src/main/java/com/codingapi/dbstream/listener/SQLUpdateExecuteListener.java

Lines changed: 0 additions & 61 deletions
This file was deleted.
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package com.codingapi.dbstream.listener.stream;
2+
3+
import com.codingapi.dbstream.interceptor.SQLExecuteState;
4+
import com.codingapi.dbstream.parser.DBEventParser;
5+
import com.codingapi.dbstream.parser.DeleteDBEventParser;
6+
import com.codingapi.dbstream.parser.DeleteSQLParser;
7+
import com.codingapi.dbstream.parser.SQLParser;
8+
import com.codingapi.dbstream.scanner.DbTable;
9+
import com.codingapi.dbstream.utils.SQLUtils;
10+
11+
public class SQLDeleteExecuteListener extends SQLStreamExecuteListener {
12+
13+
@Override
14+
public int order() {
15+
return 100;
16+
}
17+
18+
@Override
19+
public boolean isSupport(String sql) {
20+
return SQLUtils.isDeleteSQL(sql);
21+
}
22+
23+
@Override
24+
public SQLParser createSQLParser(String sql) {
25+
return new DeleteSQLParser(sql);
26+
}
27+
28+
@Override
29+
public DBEventParser createDbEventParser(SQLExecuteState sqlExecuteState, SQLParser sqlParser, DbTable dbTable) {
30+
return new DeleteDBEventParser(sqlExecuteState, (DeleteSQLParser) sqlParser, dbTable);
31+
}
32+
33+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package com.codingapi.dbstream.listener.stream;
2+
3+
import com.codingapi.dbstream.interceptor.SQLExecuteState;
4+
import com.codingapi.dbstream.parser.DBEventParser;
5+
import com.codingapi.dbstream.parser.InsertDBEventParser;
6+
import com.codingapi.dbstream.parser.InsertSQLParser;
7+
import com.codingapi.dbstream.parser.SQLParser;
8+
import com.codingapi.dbstream.scanner.DbTable;
9+
import com.codingapi.dbstream.utils.SQLUtils;
10+
11+
public class SQLInsertExecuteListener extends SQLStreamExecuteListener {
12+
13+
@Override
14+
public int order() {
15+
return 100;
16+
}
17+
18+
@Override
19+
public boolean isSupport(String sql) {
20+
return SQLUtils.isInsertSQL(sql);
21+
}
22+
23+
@Override
24+
public SQLParser createSQLParser(String sql) {
25+
return new InsertSQLParser(sql);
26+
}
27+
28+
@Override
29+
public DBEventParser createDbEventParser(SQLExecuteState sqlExecuteState, SQLParser sqlParser, DbTable dbTable) {
30+
return new InsertDBEventParser(sqlExecuteState, (InsertSQLParser) sqlParser, dbTable);
31+
}
32+
33+
34+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package com.codingapi.dbstream.listener.stream;
2+
3+
import com.codingapi.dbstream.DBStreamContext;
4+
import com.codingapi.dbstream.interceptor.SQLExecuteState;
5+
import com.codingapi.dbstream.listener.SQLExecuteListener;
6+
import com.codingapi.dbstream.parser.*;
7+
import com.codingapi.dbstream.scanner.DbTable;
8+
import com.codingapi.dbstream.stream.DBEvent;
9+
import com.codingapi.dbstream.stream.TransactionEventPools;
10+
11+
import java.sql.SQLException;
12+
import java.util.List;
13+
14+
public abstract class SQLStreamExecuteListener implements SQLExecuteListener {
15+
16+
public abstract boolean isSupport(String sql);
17+
18+
public abstract SQLParser createSQLParser(String sql);
19+
20+
public abstract DBEventParser createDbEventParser(SQLExecuteState sqlExecuteState,SQLParser sqlParser,DbTable dbTable);
21+
22+
@Override
23+
public void before(SQLExecuteState executeState) throws SQLException {
24+
String sql = executeState.getSql();
25+
if (this.isSupport(sql)) {
26+
try {
27+
ThreadLocalContext.getInstance().remove();
28+
SQLParser sqlParser = this.createSQLParser(sql);
29+
String tableName = sqlParser.getTableName();
30+
executeState.updateMetaData(tableName);
31+
DbTable dbTable = executeState.getDbTable(tableName);
32+
if (dbTable != null && DBStreamContext.getInstance().support(executeState.getDriverProperties(), dbTable)) {
33+
if (executeState.isBatchMode()) {
34+
List<SQLExecuteState> executeStateList = executeState.getBatchSQLExecuteStateList();
35+
for (int i = 0; i < executeStateList.size(); i++) {
36+
SQLExecuteState sqlExecuteState = executeStateList.get(i);
37+
DBEventParser dataParser = this.createDbEventParser(sqlExecuteState,sqlParser,dbTable);
38+
dataParser.prepare();
39+
ThreadLocalContext.getInstance().push(i, dataParser);
40+
}
41+
} else {
42+
DBEventParser dataParser = this.createDbEventParser(executeState,sqlParser,dbTable);
43+
dataParser.prepare();
44+
ThreadLocalContext.getInstance().push(dataParser);
45+
}
46+
}
47+
} catch (Exception e) {
48+
ThreadLocalContext.getInstance().remove();
49+
throw new SQLException(e);
50+
}
51+
}
52+
}
53+
54+
@Override
55+
public void after(SQLExecuteState executeState, Object result) throws SQLException {
56+
String sql = executeState.getSql();
57+
String transactionKey = executeState.getTransactionKey();
58+
if (this.isSupport(sql)) {
59+
if (executeState.isBatchMode()) {
60+
List<SQLExecuteState> executeStateList = executeState.getBatchSQLExecuteStateList();
61+
for (int i = 0; i < executeStateList.size(); i++) {
62+
DBEventParser dataParser = ThreadLocalContext.getInstance().get(i);
63+
if (dataParser != null) {
64+
List<DBEvent> eventList = dataParser.loadEvents(result);
65+
TransactionEventPools.getInstance().addEvents(transactionKey, eventList);
66+
}
67+
}
68+
ThreadLocalContext.getInstance().remove();
69+
} else {
70+
DBEventParser dataParser = ThreadLocalContext.getInstance().get();
71+
if (dataParser != null) {
72+
List<DBEvent> eventList = dataParser.loadEvents(result);
73+
TransactionEventPools.getInstance().addEvents(transactionKey, eventList);
74+
}
75+
ThreadLocalContext.getInstance().remove();
76+
}
77+
}
78+
}
79+
80+
}

0 commit comments

Comments
 (0)