Skip to content

Commit ce2f88a

Browse files
committed
fix batch result check
1 parent 1d1cb68 commit ce2f88a

File tree

2 files changed

+47
-11
lines changed

2 files changed

+47
-11
lines changed

src/main/java/com/codingapi/dbstream/listener/dbevent/DBEventExecuteListener.java

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
package com.codingapi.dbstream.listener.dbevent;
22

33
import com.codingapi.dbstream.DBStreamContext;
4+
import com.codingapi.dbstream.event.DBEvent;
5+
import com.codingapi.dbstream.event.TransactionEventPools;
46
import com.codingapi.dbstream.interceptor.SQLExecuteState;
57
import com.codingapi.dbstream.listener.SQLExecuteListener;
6-
import com.codingapi.dbstream.parser.*;
8+
import com.codingapi.dbstream.parser.DBEventParser;
9+
import com.codingapi.dbstream.parser.SQLParser;
710
import com.codingapi.dbstream.scanner.DbTable;
8-
import com.codingapi.dbstream.event.DBEvent;
9-
import com.codingapi.dbstream.event.TransactionEventPools;
1011

1112
import java.sql.SQLException;
13+
import java.util.ArrayList;
1214
import java.util.List;
1315

1416
/**
@@ -77,6 +79,35 @@ public void before(SQLExecuteState executeState) throws SQLException {
7779
}
7880
}
7981

82+
83+
/**
84+
* batch result 结果数组转化
85+
*/
86+
private List<Object> batchResultToArrays(Object result, int size) {
87+
List<Object> list = new ArrayList<>();
88+
if (result instanceof int[]) {
89+
int[] rows = (int[]) result;
90+
for (int row : rows) {
91+
list.add(row);
92+
}
93+
return list;
94+
}
95+
if (result instanceof long[]) {
96+
long[] rows = (long[]) result;
97+
for (long row : rows) {
98+
list.add(row);
99+
}
100+
return list;
101+
}
102+
103+
// 如果非int[] 和 long[] 的返回数据,则直接返回0,忽略事件
104+
for (int i = 0; i < size; i++) {
105+
list.add(0);
106+
}
107+
return list;
108+
109+
}
110+
80111
@Override
81112
public void after(SQLExecuteState executeState, Object result) throws SQLException {
82113
String sql = executeState.getSql();
@@ -86,11 +117,16 @@ public void after(SQLExecuteState executeState, Object result) throws SQLExcepti
86117
// 批量模式
87118
if (executeState.isBatchMode()) {
88119
List<SQLExecuteState> executeStateList = executeState.getBatchSQLExecuteStateList();
89-
for (int i = 0; i < executeStateList.size(); i++) {
120+
int batchSize = executeStateList.size();
121+
122+
//批量模式下的返回数据是数组格式
123+
List<Object> arrays = this.batchResultToArrays(result, batchSize);
124+
125+
for (int i = 0; i < batchSize; i++) {
90126
DBEventParser dataParser = ThreadLocalContext.getInstance().get(i);
91127
if (dataParser != null) {
92128
// 获取DB事件信息
93-
List<DBEvent> eventList = dataParser.loadEvents(result);
129+
List<DBEvent> eventList = dataParser.loadEvents(arrays.get(i));
94130
TransactionEventPools.getInstance().addEvents(transactionKey, eventList);
95131
}
96132
}

src/test/resources/application.properties

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ spring.application.name=example
22
spring.datasource.driver-class-name=com.codingapi.dbstream.driver.DBStreamProxyDriver
33
spring.datasource.url=jdbc:h2:file:./test.db
44
spring.jpa.hibernate.ddl-auto=update
5-
#spring.jpa.properties.hibernate.jdbc.batch_size=1000
6-
#spring.jpa.properties.hibernate.order_inserts=true
7-
#spring.jpa.properties.hibernate.order_updates=true
8-
#spring.jpa.properties.hibernate.order_deletes=true
9-
#spring.jpa.properties.hibernate.batch_versioned_data=true
10-
#spring.jpa.properties.hibernate.flush.mode=COMMIT
5+
spring.jpa.properties.hibernate.jdbc.batch_size=1000
6+
spring.jpa.properties.hibernate.order_inserts=true
7+
spring.jpa.properties.hibernate.order_updates=true
8+
spring.jpa.properties.hibernate.order_deletes=true
9+
spring.jpa.properties.hibernate.batch_versioned_data=true
10+
spring.jpa.properties.hibernate.flush.mode=COMMIT

0 commit comments

Comments
 (0)