|
@@ -1,10 +1,10 @@
|
|
package org.dbsyncer.parser.flush.impl;
|
|
package org.dbsyncer.parser.flush.impl;
|
|
|
|
|
|
import com.alibaba.fastjson.JSONException;
|
|
import com.alibaba.fastjson.JSONException;
|
|
-import org.dbsyncer.common.scheduled.ScheduledTaskJob;
|
|
|
|
-import org.dbsyncer.common.scheduled.ScheduledTaskService;
|
|
|
|
import org.dbsyncer.common.util.JsonUtil;
|
|
import org.dbsyncer.common.util.JsonUtil;
|
|
|
|
+import org.dbsyncer.parser.flush.BufferActuator;
|
|
import org.dbsyncer.parser.flush.FlushService;
|
|
import org.dbsyncer.parser.flush.FlushService;
|
|
|
|
+import org.dbsyncer.parser.flush.model.FlushRequest;
|
|
import org.dbsyncer.storage.SnowflakeIdWorker;
|
|
import org.dbsyncer.storage.SnowflakeIdWorker;
|
|
import org.dbsyncer.storage.StorageService;
|
|
import org.dbsyncer.storage.StorageService;
|
|
import org.dbsyncer.storage.constant.ConfigConstant;
|
|
import org.dbsyncer.storage.constant.ConfigConstant;
|
|
@@ -15,12 +15,10 @@ import org.slf4j.LoggerFactory;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.stereotype.Component;
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
-import javax.annotation.PostConstruct;
|
|
|
|
import java.time.Instant;
|
|
import java.time.Instant;
|
|
-import java.util.*;
|
|
|
|
-import java.util.concurrent.ConcurrentLinkedQueue;
|
|
|
|
-import java.util.concurrent.Executor;
|
|
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
+import java.util.HashMap;
|
|
|
|
+import java.util.List;
|
|
|
|
+import java.util.Map;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
import java.util.stream.Collectors;
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
@@ -34,7 +32,7 @@ import java.util.stream.Collectors;
|
|
* @date 2020/05/19 18:38
|
|
* @date 2020/05/19 18:38
|
|
*/
|
|
*/
|
|
@Component
|
|
@Component
|
|
-public class FlushServiceImpl implements FlushService, ScheduledTaskJob {
|
|
|
|
|
|
+public class FlushServiceImpl implements FlushService {
|
|
|
|
|
|
private final Logger logger = LoggerFactory.getLogger(getClass());
|
|
private final Logger logger = LoggerFactory.getLogger(getClass());
|
|
|
|
|
|
@@ -45,23 +43,7 @@ public class FlushServiceImpl implements FlushService, ScheduledTaskJob {
|
|
private SnowflakeIdWorker snowflakeIdWorker;
|
|
private SnowflakeIdWorker snowflakeIdWorker;
|
|
|
|
|
|
@Autowired
|
|
@Autowired
|
|
- private ScheduledTaskService scheduledTaskService;
|
|
|
|
-
|
|
|
|
- @Autowired
|
|
|
|
- private Executor taskExecutor;
|
|
|
|
-
|
|
|
|
- private Queue<Task> buffer = new ConcurrentLinkedQueue();
|
|
|
|
-
|
|
|
|
- private Queue<Task> temp = new ConcurrentLinkedQueue();
|
|
|
|
-
|
|
|
|
- private final Object LOCK = new Object();
|
|
|
|
-
|
|
|
|
- private volatile boolean running;
|
|
|
|
-
|
|
|
|
- @PostConstruct
|
|
|
|
- private void init() {
|
|
|
|
- scheduledTaskService.start("*/3 * * * * ?", this);
|
|
|
|
- }
|
|
|
|
|
|
+ private BufferActuator flushBufferActuator;
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void asyncWrite(String type, String error) {
|
|
public void asyncWrite(String type, String error) {
|
|
@@ -94,68 +76,7 @@ public class FlushServiceImpl implements FlushService, ScheduledTaskJob {
|
|
return params;
|
|
return params;
|
|
}).collect(Collectors.toList());
|
|
}).collect(Collectors.toList());
|
|
|
|
|
|
- if (running) {
|
|
|
|
- temp.offer(new Task(metaId, list));
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- buffer.offer(new Task(metaId, list));
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public void run() {
|
|
|
|
- if (running) {
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
- synchronized (LOCK) {
|
|
|
|
- if (running) {
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
- running = true;
|
|
|
|
- flush(buffer);
|
|
|
|
- running = false;
|
|
|
|
- try {
|
|
|
|
- TimeUnit.MILLISECONDS.sleep(10);
|
|
|
|
- } catch (InterruptedException e) {
|
|
|
|
- logger.error(e.getMessage());
|
|
|
|
- }
|
|
|
|
- flush(temp);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private void flush(Queue<Task> buffer) {
|
|
|
|
- if (!buffer.isEmpty()) {
|
|
|
|
- final Map<String, List<Map>> task = new LinkedHashMap<>();
|
|
|
|
- while (!buffer.isEmpty()) {
|
|
|
|
- Task t = buffer.poll();
|
|
|
|
- if (!task.containsKey(t.metaId)) {
|
|
|
|
- task.putIfAbsent(t.metaId, new LinkedList<>());
|
|
|
|
- }
|
|
|
|
- task.get(t.metaId).addAll(t.list);
|
|
|
|
- }
|
|
|
|
- task.forEach((metaId, list) -> {
|
|
|
|
- taskExecutor.execute(() -> {
|
|
|
|
- long now = Instant.now().toEpochMilli();
|
|
|
|
- try {
|
|
|
|
- storageService.addData(StorageEnum.DATA, metaId, list);
|
|
|
|
- } catch (Exception e) {
|
|
|
|
- logger.error("[{}]-flushData异常{}", metaId, list.size());
|
|
|
|
- }
|
|
|
|
- logger.info("[{}]-flushData{}条,耗时{}秒", metaId, list.size(), (Instant.now().toEpochMilli() - now) / 1000);
|
|
|
|
- });
|
|
|
|
- });
|
|
|
|
- task.clear();
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- final class Task {
|
|
|
|
- String metaId;
|
|
|
|
- List<Map> list;
|
|
|
|
-
|
|
|
|
- public Task(String metaId, List<Map> list) {
|
|
|
|
- this.metaId = metaId;
|
|
|
|
- this.list = list;
|
|
|
|
- }
|
|
|
|
|
|
+ flushBufferActuator.offer(new FlushRequest(metaId, list));
|
|
}
|
|
}
|
|
|
|
|
|
}
|
|
}
|