穿云 2 bulan lalu
induk
melakukan
9ef0bae298

+ 1 - 0
dbsyncer-connector/dbsyncer-connector-mysql/src/main/java/org/dbsyncer/connector/mysql/cdc/MySQLListener.java

@@ -347,6 +347,7 @@ public class MySQLListener extends AbstractDatabaseListener {
             if (StringUtil.isBlank(databaseName)) {
                 databaseName = data.getDatabase();
             }
+            databaseName = StringUtil.replace(databaseName, "`", "");
             if (isFilterTable(databaseName, tableName)) {
                 logger.info("sql:{}", data.getSql());
                 trySendEvent(new DDLChangedEvent(tableName, ConnectorConstant.OPERTION_ALTER,

+ 2 - 1
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/AbstractBufferActuator.java

@@ -23,6 +23,7 @@ import java.util.Queue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -179,7 +180,7 @@ public abstract class AbstractBufferActuator<Request extends BufferRequest, Resp
     public void run() {
         boolean locked = false;
         try {
-            locked = taskLock.tryLock();
+            locked = taskLock.tryLock(3, TimeUnit.SECONDS);
             if (locked) {
                 submit();
             }

+ 5 - 2
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/BufferActuatorRouter.java

@@ -98,8 +98,11 @@ public final class BufferActuatorRouter implements DisposableBean {
         if (ChangedEventTypeEnum.isDDL(event.getType())) {
             WriterRequest request = new WriterRequest(event);
             // DDL事件,阻塞等待队列消费完成
-            while (actuator.getQueue().isEmpty() && actuator.isRunning(request)){
-                actuator.offer(request);
+            while (actuator.isRunning(request)) {
+                if (actuator.getQueue().isEmpty()) {
+                    actuator.offer(request);
+                    return;
+                }
                 try {
                     TimeUnit.MILLISECONDS.sleep(10);
                 } catch (InterruptedException ex) {

+ 1 - 1
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/listener/AbstractQuartzListener.java

@@ -87,7 +87,7 @@ public abstract class AbstractQuartzListener extends AbstractListener implements
         final Lock taskLock = lock;
         boolean locked = false;
         try {
-            locked = taskLock.tryLock();
+            locked = taskLock.tryLock(3, TimeUnit.SECONDS);
             if (locked) {
                 for (int i = 0; i < commands.size(); i++) {
                     execute(commands.get(i), i);

+ 1 - 1
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/storage/AbstractStorageService.java

@@ -66,7 +66,7 @@ public abstract class AbstractStorageService implements StorageService, Disposab
                 return select(sharding, query);
             }
         } catch (InterruptedException e) {
-            logger.warn("tryLock error", e.getLocalizedMessage());
+            logger.warn("tryLock error:{}", e.getLocalizedMessage());
         } catch (NullExecutorException e) {
             // 存储表不存在或已删除,请重试
         } finally {