FlushServiceImpl.java 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
  1. package org.dbsyncer.parser.flush;
  2. import org.dbsyncer.common.util.JsonUtil;
  3. import org.dbsyncer.storage.SnowflakeIdWorker;
  4. import org.dbsyncer.storage.StorageService;
  5. import org.dbsyncer.storage.constant.ConfigConstant;
  6. import org.dbsyncer.storage.enums.StorageEnum;
  7. import org.slf4j.Logger;
  8. import org.slf4j.LoggerFactory;
  9. import org.springframework.beans.factory.annotation.Autowired;
  10. import org.springframework.stereotype.Component;
  11. import java.util.*;
  12. /**
  13. * 持久化
  14. * <p>全量或增量数据</p>
  15. * <p>系统日志</p>
  16. *
  17. * @author AE86
  18. * @version 1.0.0
  19. * @date 2020/05/19 18:38
  20. */
  21. @Component
  22. public class FlushServiceImpl implements FlushService {
  23. private final Logger logger = LoggerFactory.getLogger(getClass());
  24. @Autowired
  25. private StorageService storageService;
  26. @Autowired
  27. private SnowflakeIdWorker snowflakeIdWorker;
  28. @Override
  29. public void asyncWrite(String metaId, String error) {
  30. Map<String, Object> params = new HashMap();
  31. params.put(ConfigConstant.CONFIG_MODEL_ID, snowflakeIdWorker.nextId());
  32. params.put(ConfigConstant.CONFIG_MODEL_TYPE, metaId);
  33. params.put(ConfigConstant.CONFIG_MODEL_JSON, error);
  34. params.put(ConfigConstant.CONFIG_MODEL_CREATE_TIME, System.currentTimeMillis());
  35. storageService.addLog(StorageEnum.LOG, params);
  36. }
  37. @Override
  38. public void asyncWrite(String metaId, Queue<Map<String, Object>> data) {
  39. List<Map> list = new LinkedList<>();
  40. long now = System.currentTimeMillis();
  41. Map<String, Object> params = null;
  42. while (!data.isEmpty()){
  43. params = new HashMap();
  44. params.put(ConfigConstant.CONFIG_MODEL_ID, snowflakeIdWorker.nextId());
  45. params.put(ConfigConstant.CONFIG_MODEL_JSON, JsonUtil.objToJson(data.poll()));
  46. params.put(ConfigConstant.CONFIG_MODEL_CREATE_TIME, now);
  47. list.add(params);
  48. }
  49. storageService.addData(StorageEnum.DATA, metaId, list);
  50. }
  51. }