扫二维码与项目经理沟通
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流
Spring Batch官网介绍:
十多年的托克逊网站建设经验,针对设计、前端、开发、售后、文案、推广等六对一服务,响应快,48小时及时工作处理。成都全网营销推广的优势是能够根据用户设备显示端的尺寸不同,自动调整托克逊建站的显示方式,使网站能够适用不同显示终端,在浏览器中调整网站的宽度,无论在任何一种浏览器上浏览网站,都能展现优雅布局与设计,从而大程度地提升浏览体验。创新互联从事“托克逊网站设计”,“托克逊网站推广”以来,每个客户项目都认真落实执行。
A lightweight, comprehensive batch framework designed to enable the development of robust batch applications vital for the daily operations of enterprise systems.(一款轻量的、全面的批处理框架,用于开发强大的日常运营的企业级批处理应用程序。)
springbatch
主要实现批量数据的处理,我对batch进行的封装,提出了jobBase类型,具体job需要实现它即可。Spring Batch 不仅提供了统一的读写接口、丰富的任务处理方式、灵活的事务管理及并发处理,同时还支持日志、监控、任务重启与跳过等特性,大大简化了批处理应用开发,将开发人员从复杂的任务配置管理过程中解放出来,使他们可以更多地去关注核心的业务处理过程。
几个组件
•job
•step
•read
•write
•listener
•process
•validator
JobBase定义了几个公用的方法
/** * springBatch的job基础类. */ public abstract class JobBase{ /** * 批次. */ protected int chunkCount = 5000; /** * 监听器. */ private JobExecutionListener jobExecutionListener; /** * 处理器. */ private ValidatingItemProcessor validatingItemProcessor; /** * job名称. */ private String jobName; /** * 检验器. */ private Validator validator; @Autowired private JobBuilderFactory job; @Autowired private StepBuilderFactory step; /** * 初始化. * * @param jobName job名称 * @param jobExecutionListener 监听器 * @param validatingItemProcessor 处理器 * @param validator 检验 */ public JobBase(String jobName, JobExecutionListener jobExecutionListener, ValidatingItemProcessor validatingItemProcessor, Validator validator) { this.jobName = jobName; this.jobExecutionListener = jobExecutionListener; this.validatingItemProcessor = validatingItemProcessor; this.validator = validator; } /** * job初始化与启动. */ public Job getJob() throws Exception { return job.get(jobName).incrementer(new RunIdIncrementer()) .start(syncStep()) .listener(jobExecutionListener) .build(); } /** * 执行步骤. * * @return */ public Step syncStep() throws Exception { return step.get("step1") . chunk(chunkCount) .reader(reader()) .processor(processor()) .writer(writer()) .build(); } /** * 单条处理数据. * * @return */ public ItemProcessor processor() { validatingItemProcessor.setValidator(processorValidator()); return validatingItemProcessor; } /** * 校验数据. * * @return */ @Bean public Validator processorValidator() { return validator; } /** * 批量读数据. * * @return * @throws Exception */ public abstract ItemReader reader() throws Exception; /** * 批量写数据. * * @return */ @Bean public abstract ItemWriter writer(); }
主要规定了公用方法的执行策略,而具体的job名称,读,写还是需要具体JOB去实现的。
具体Job实现
@Configuration @EnableBatchProcessing public class SyncPersonJob extends JobBase{ @Autowired private DataSource dataSource; @Autowired @Qualifier("primaryJdbcTemplate") private JdbcTemplate jdbcTemplate; /** * 初始化,规则了job名称和监视器. */ public SyncPersonJob() { super("personJob", new PersonJobListener(), new PersonItemProcessor(), new BeanValidator<>()); } @Override public ItemReader reader() throws Exception { StringBuffer sb = new StringBuffer(); sb.append("select * from person"); String sql = sb.toString(); JdbcCursorItemReader jdbcCursorItemReader = new JdbcCursorItemReader<>(); jdbcCursorItemReader.setSql(sql); jdbcCursorItemReader.setRowMapper(new BeanPropertyRowMapper<>(Person.class)); jdbcCursorItemReader.setDataSource(dataSource); return jdbcCursorItemReader; } @Override @Bean("personJobWriter") public ItemWriter writer() { JdbcBatchItemWriter writer = new JdbcBatchItemWriter (); writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider ()); String sql = "insert into person_export " + "(id,name,age,nation,address) " + "values(:id, :name, :age, :nation,:address)"; writer.setSql(sql); writer.setDataSource(dataSource); return writer; } }
写操作需要定义自己的bean的声明
注意,需要为每个job的write启个名称,否则在多job时,write将会被打乱
/** * 批量写数据. * * @return */ @Override @Bean("personVerson2JobWriter") public ItemWriterwriter() { }
添加一个api,手动触发
@Autowired SyncPersonJob syncPersonJob; @Autowired JobLauncher jobLauncher; void exec(Job job) throws Exception { JobParameters jobParameters = new JobParametersBuilder() .addLong("time", System.currentTimeMillis()) .toJobParameters(); jobLauncher.run(job, jobParameters); } @RequestMapping("/run1") public String run1() throws Exception { exec(syncPersonJob.getJob()); return "personJob success"; }
总结
以上所述是小编给大家介绍的springbatch的封装与使用实例详解,希望对大家有所帮助,如果大家有任何疑问欢迎给我留言,小编会及时回复大家的!
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流