Browse Source

add trace id

穿云 3 weeks ago
parent
commit
b908a69f47

+ 2 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/flush/impl/GeneralBufferActuator.java

@@ -114,6 +114,7 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
             response.setChangedOffset(request.getChangedOffset());
         }
         if (!response.isMerged()) {
+            response.setTraceId(request.getTraceId());
             response.setTableName(request.getTableName());
             response.setEvent(request.getEvent());
             response.setTypeEnum(request.getTypeEnum());
@@ -198,6 +199,7 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
         context.setTargetConnectorInstance(connectorFactory.connect(getConnectorConfig(mapping.getTargetConnectorId())));
         context.setSourceTableName(tableGroup.getSourceTable().getName());
         context.setTargetTableName(tableGroup.getTargetTable().getName());
+        context.setTraceId(response.getTraceId());
         context.setEvent(response.getEvent());
         context.setTargetFields(tableGroupPicker.getTargetFields());
         context.setCommand(tableGroup.getCommand());

+ 9 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/AbstractWriter.java

@@ -20,6 +20,8 @@ public abstract class AbstractWriter {
 
     private String sql;
 
+    private String traceId;
+
     public ChangedEventTypeEnum getTypeEnum() {
         return typeEnum;
     }
@@ -60,4 +62,11 @@ public abstract class AbstractWriter {
         this.sql = sql;
     }
 
+    public String getTraceId() {
+        return traceId;
+    }
+
+    public void setTraceId(String traceId) {
+        this.traceId = traceId;
+    }
 }

+ 1 - 0
dbsyncer-parser/src/main/java/org/dbsyncer/parser/model/WriterRequest.java

@@ -15,6 +15,7 @@ public class WriterRequest extends AbstractWriter implements BufferRequest {
     private final List<Object> row;
 
     public WriterRequest(ChangedEvent event) {
+        setTraceId(event.getTraceId());
         setTypeEnum(event.getType());
         setChangedOffset(event.getChangedOffset());
         setTableName(event.getSourceTableName());

+ 4 - 3
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/connector/database/AbstractDatabaseConnector.java

@@ -582,7 +582,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
             }
             final String event = existRow(connectorInstance, queryCount, args) ? ConnectorConstant.OPERTION_UPDATE
                     : ConnectorConstant.OPERTION_INSERT;
-            logger.warn("{}表执行{}失败, 重新执行{}, {}", context.getTargetTableName(), context.getEvent(), event, row);
+            logger.warn("{} {}表执行{}失败, 重新执行{}, {}", context.getTraceId(), context.getTargetTableName(), context.getEvent(), event, row);
             writer(result, connectorInstance, context, pkFields, row, event);
         }
     }
@@ -607,12 +607,13 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector implem
             // 2、设置参数
             int execute = connectorInstance.execute(databaseTemplate -> databaseTemplate.update(sql, batchRow(fields, row)));
             if (execute == 0) {
-                throw new SdkException(String.format("尝试执行[%s]失败", event));
+                throw new SdkException(String.format("%s 尝试执行[%s]失败", context.getTraceId(), event));
             }
             result.getSuccessData().add(row);
         } catch (Exception e) {
             result.getFailData().add(row);
-            result.getError().append("SQL:").append(sql).append(System.lineSeparator())
+            result.getError().append(context.getTraceId())
+                    .append(" SQL:").append(sql).append(System.lineSeparator())
                     .append("DATA:").append(row).append(System.lineSeparator())
                     .append("ERROR:").append(e.getMessage()).append(System.lineSeparator());
             logger.error("执行{}失败: {}, DATA:{}", event, e.getMessage(), row);

+ 10 - 0
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/listener/ChangedEvent.java

@@ -20,6 +20,16 @@ import java.util.List;
  */
 public interface ChangedEvent {
 
+    /**
+     * 获取traceId
+     */
+    String getTraceId();
+
+    /**
+     * 设置traceId
+     */
+    void setTraceId(String traceId);
+
     /**
      * 获取变更事件类型
      */

+ 14 - 0
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/listener/event/CommonChangedEvent.java

@@ -15,6 +15,10 @@ import org.dbsyncer.sdk.model.ChangedOffset;
  */
 public abstract class CommonChangedEvent implements ChangedEvent {
 
+    /**
+     * traceId
+     */
+    private String traceId;
     /**
      * 变更表名称
      */
@@ -28,6 +32,16 @@ public abstract class CommonChangedEvent implements ChangedEvent {
      */
     private final ChangedOffset changedOffset = new ChangedOffset();
 
+    @Override
+    public String getTraceId() {
+        return traceId;
+    }
+
+    @Override
+    public void setTraceId(String traceId) {
+        this.traceId = traceId;
+    }
+
     @Override
     public String getSourceTableName() {
         return sourceTableName;

+ 11 - 0
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/plugin/AbstractPluginContext.java

@@ -74,6 +74,8 @@ public abstract class AbstractPluginContext extends AbstractBaseContext implemen
      */
     private String pluginExtInfo;
 
+    private String traceId;
+
     @Override
     public boolean isTerminated() {
         return terminated;
@@ -184,6 +186,15 @@ public abstract class AbstractPluginContext extends AbstractBaseContext implemen
         this.pluginExtInfo = pluginExtInfo;
     }
 
+    @Override
+    public String getTraceId() {
+        return traceId;
+    }
+
+    public void setTraceId(String traceId) {
+        this.traceId = traceId;
+    }
+
     @Override
     public Object clone() throws CloneNotSupportedException {
         return super.clone();

+ 7 - 0
dbsyncer-sdk/src/main/java/org/dbsyncer/sdk/plugin/PluginContext.java

@@ -99,6 +99,13 @@ public interface PluginContext extends BaseContext {
      */
     String getPluginExtInfo();
 
+    /**
+     * 获取TraceId
+     *
+     * @return
+     */
+    String getTraceId();
+
     /**
      * 浅拷贝
      *