插件有什么用?
插件是一种可扩展全量同步和增量同步实现数据转换的技术方式。通过插件可以接收同步数据,自定义同步到目标源的行数据,也能消费数据并实现更多业务场景。
如何开发插件?
<dependency>
<groupId>org.ghi</groupId>
<artifactId>dbsyncer-sdk</artifactId>
<version>[[${version}]]</version>
</dependency>
package org.test; import org.dbsyncer.sdk.connector.ConnectorInstance; import org.dbsyncer.sdk.connector.database.DatabaseConnectorInstance; import org.dbsyncer.sdk.connector.database.ds.SimpleConnection; import org.dbsyncer.sdk.plugin.PluginContext; import org.dbsyncer.sdk.spi.PluginService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MyPlugin implements PluginService { private final Logger logger = LoggerFactory.getLogger(getClass()); /** * 全量同步/增量同步 * * @param convertContext */ @Override public void convert(PluginContext pluginContext) { // TODO 消费或处理数据 System.out.println("插件消费数据中..."); // 是否终止同步到目标库开关,默认false pluginContext.setTerminated(false); // 数据源表和目标源表 pluginContext.getSourceTableName(); pluginContext.getTargetTableName(); // 捕获的事件(INSERT/UPDATE/DELETE) pluginContext.getEvent(); // 数据源和目标源表全量或增量数据 pluginContext.getSourceList(); pluginContext.getTargetList(); // 获取目标库连接器实例(如果需要用到连接器,必须引入dbsyncer-connector-[[${version}]].jar) pluginContext.getTargetConnectorInstance(); } /** * 全量同步/增量同步完成后执行处理 * * @param context */ @Override public void postProcessAfter(PluginContext context) { // 完成同步后调用该方法 logger.info("插件正在处理同步成功的数据,目标源表:{},事件:{},条数:{}", context.getTargetTableName(), context.getEvent(), context.getTargetList().size()); ConnectorInstance connectorInstance = context.getSourceConnectorInstance(); // 获取关系型数据库连接,实现自己的业务逻辑... if (connectorInstance instanceof DatabaseConnectorInstance) { DatabaseConnectorInstance db = (DatabaseConnectorInstance) connectorInstance; // 方式一(推荐): String query = "select * from my_user"; db.execute(databaseTemplate -> databaseTemplate.queryForList(query)); // 方式二: SimpleConnection connection = null; try { // 通过JDBC访问数据库 connection = (SimpleConnection) db.getConnection(); } catch (Exception e) { e.printStackTrace(); } finally { if(connection != null){ connection.close(); } } } } /** * 重写方法:设置版本号 * * @return */ @Override public String getVersion() { return "1.0.0"; } /** * 重写方法:设置插件名称 * * @return */ @Override public String getName() { return "我的插件"; } }