Skip to content

简介

反向etl区别与etl,其主要目的在于数据仓库到下游系统, etl更多的是为了 数仓的创建. 反向etl目的在于提供一个标准声明,方便下游app共享基础能力和扩展. 用途: 主数据订阅/第三方数据订阅等

核心组件

  • easybatch 批处理
  • xxljob 分布式调度
  • pf4j 插件化开发
  • dremio 数据湖(大于数仓,提供更强大的olap能力)

easybatch 批处理结合 xxljob调度即可实现 简单可靠的数据分发功能,配合数据湖/pf4j 插件开发,可以实现相当灵活的 : 反向etl功能/数据订阅.

java
public interface IBatch<SOURCE, SINK> {
    
    JobExecutor jobExecutor();
    
    RecordReader reader(SOURCE i);
    
    RecordWriter writer(BatchResult result, SINK o);
    
    RecordMapper mapper();
    
    default CustomPipelineListener customPipelineListener(String jobId) {
        return new CustomPipelineListener(jobId);
    }
    
    default CustomWriterListener customWriterListener(String jobId) {
        return new CustomWriterListener(jobId);
    }
    
    default CustomReaderListener customReaderListener(String jobId) {
        return new CustomReaderListener(jobId);
    }
    
    // 可选
    default RecordProcessor processor() {
        return null;
    }
    
    default BatchResult execute(SOURCE source, SINK sink) {
        BatchResult result = new BatchResult(UUID.randomUUID().toString(), XxlJobHelper.getJobId());
        JobExecutor jobExecutor = jobExecutor();
        JobBuilder jobBuilder = new JobBuilder();
        jobBuilder.reader(reader(source));
        jobBuilder.writer(writer(result, sink));
        //jobBuilder.batchSize(100);
        RecordMapper mapper = mapper();
        if (mapper != null) {
            jobBuilder.mapper(mapper);
        }
        RecordProcessor processor = processor();
        if (processor != null) {
            jobBuilder.processor(processor);
        }
        jobBuilder.readerListener(customReaderListener(result.getJobId()));
        jobBuilder.writerListener(customWriterListener(result.getJobId()));
        jobBuilder.pipelineListener(customPipelineListener(result.getJobId()));
        Job job = jobBuilder.build();
        JobReport jobReport = jobExecutor.execute(job);
        result.setJobReport(jobReport.getStatus().toString());
        return result;
    }
    
    
}
java
@Data
@AllArgsConstructor
@Slf4j
public class BatchResult {

    private String jobId;
    private long xxlJobId;
    /**
     * 总分发记录数
     */
    private Long totalNum;
    /**
     * 当前分发记录序号
     */
    private Long currentNum;
    /**
     * 单次写请求的数据量
     */
    private Integer batchSize;
    private Boolean success;
    private String msg;
    private String jobReport;
    private int successNum;
    private int errorNum;
}