|
@@ -1,5 +1,9 @@
|
|
|
package org.dbsyncer.parser.flush.impl;
|
|
|
|
|
|
+import net.sf.jsqlparser.JSQLParserException;
|
|
|
+import net.sf.jsqlparser.parser.CCJSqlParserUtil;
|
|
|
+import net.sf.jsqlparser.statement.alter.Alter;
|
|
|
+import net.sf.jsqlparser.statement.alter.AlterExpression;
|
|
|
import org.dbsyncer.cache.CacheService;
|
|
|
import org.dbsyncer.common.config.GeneralBufferConfig;
|
|
|
import org.dbsyncer.common.event.RefreshOffsetEvent;
|
|
@@ -12,10 +16,12 @@ import org.dbsyncer.common.util.CollectionUtils;
|
|
|
import org.dbsyncer.common.util.StringUtil;
|
|
|
import org.dbsyncer.connector.ConnectorFactory;
|
|
|
import org.dbsyncer.connector.constant.ConnectorConstant;
|
|
|
+import org.dbsyncer.connector.database.DatabaseConnectorMapper;
|
|
|
import org.dbsyncer.parser.ParserFactory;
|
|
|
import org.dbsyncer.parser.flush.AbstractBufferActuator;
|
|
|
import org.dbsyncer.parser.model.BatchWriter;
|
|
|
import org.dbsyncer.parser.model.Connector;
|
|
|
+import org.dbsyncer.parser.model.FieldMapping;
|
|
|
import org.dbsyncer.parser.model.Mapping;
|
|
|
import org.dbsyncer.parser.model.Picker;
|
|
|
import org.dbsyncer.parser.model.TableGroup;
|
|
@@ -121,7 +127,32 @@ public class GeneralBufferActuator extends AbstractBufferActuator<WriterRequest,
|
|
|
//转换到目标数据源的sql语句
|
|
|
String sourceSql = response.getSql();
|
|
|
DDLConvertContext ddlConvertContext = new DDLConvertContext(sConnectorMapper, tConnectorMapper, sourceTableName, targetTableName, event,sourceSql);
|
|
|
- ddlConvertContext.convertSql();
|
|
|
+// ddlConvertContext.convertSql();
|
|
|
+ //先确定转换的sql,源数据库类型的sql-》目标数据库类型的sql
|
|
|
+ //如果都是db,那么db只是对应的字段进行映射,意思是只替换映射字段
|
|
|
+ if (sConnectorMapper instanceof DatabaseConnectorMapper && tConnectorMapper instanceof DatabaseConnectorMapper){
|
|
|
+ try {
|
|
|
+ Alter alter = (Alter) CCJSqlParserUtil.parse(sourceSql);
|
|
|
+ for (AlterExpression alterExpression:alter.getAlterExpressions()) {
|
|
|
+ for (AlterExpression.ColumnDataType columnDataType:alterExpression.getColDataTypeList()) {
|
|
|
+ String columName = columnDataType.getColumnName();
|
|
|
+ columName = columName.replaceAll("`","");
|
|
|
+ for (FieldMapping fieldMapping :group.getFieldMapping()) {
|
|
|
+ if (fieldMapping.getSource().getName().equals(columName)){
|
|
|
+ columnDataType.setColumnName(fieldMapping.getTarget().getName());
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ ddlConvertContext.setTargetSql(alter.toString());
|
|
|
+ }
|
|
|
+ } catch (JSQLParserException e) {
|
|
|
+// throw new RuntimeException(e);
|
|
|
+ ddlConvertContext.convertSql();
|
|
|
+ }
|
|
|
+ }else{//如果有一方不是db,解析sql,进行填充
|
|
|
+ ddlConvertContext.convertSql();
|
|
|
+ }
|
|
|
//进行sql执行
|
|
|
Result result = parserFactory.writeSql(ddlConvertContext, generalExecutor);
|
|
|
flushStrategy.flushIncrementData(mapping.getMetaId(), result, event);
|