Групна обрада - типична за масовно оријентисано, неинтерактивно и често дуготрајно извршавање у позадини - широко се користи у готово свакој индустрији и примењује се на широк спектар задатака. Групна обрада може бити интензивна у подацима или рачунски, извршавати се секвенцијално или паралелно и може се покренути кроз различите моделе позивања, укључујући ад хоц, заказане и на захтев.
Овај водич за Спринг Батцх објашњава модел програмирања и доменски језик батцх апликација уопште, а посебно приказује неке корисне приступе дизајнирању и развоју батцх апликација користећи тренутну верзију Спринг Батцх 3.0.7 верзија.
Шта је Спринг Батцх?
Спринг Батцх је лаган, свеобухватан оквир дизајниран да олакша развој робусних батцх апликација. Такође пружа напредније техничке услуге и функције које подржавају серијске послове изузетно великог обима и високих перформанси кроз своје технике оптимизације и партиционирања. Спринг Батцх се надовезује на Заснован на ПОЈО развојни приступ Спринг Фрамеворк , свима познат искусни пролећни програмери .
Као пример, овај чланак разматра изворни код из узорка пројекта који учитава КСМЛ-обликовану датотеку купаца, филтрира купце према различитим атрибутима и износе филтриране уносе у текстуалну датотеку. Изворни код за наш пример Спринг Батцх-а (који користи Ломбок напомене) је доступан овде на ГитХуб-у и захтева Јава СЕ 8 и Мавен.
Важно је да било који програмер серије буде упознат са главним концептима серијске обраде и да му буде угодно. Дијаграм у наставку представља поједностављену верзију батцх референтне архитектуре која је доказана деценијама имплементације на многим различитим платформама. Представља кључне концепте и појмове релевантне за шаржну обраду, које користи Спринг Батцх.
Као што је приказано у нашем примеру групне обраде, пакетни процес се обично инкапсулира са Job
који се састоји од више Step
с. Сваки Step
типично има један ItemReader
, ItemProcessor
и ItemWriter
. А Job
извршава се JobLauncher
, а метаподаци о конфигурисаним и извршеним пословима се чувају у JobRepository
.
Сваки Job
могу бити повезани са више JobInstance
с, од којих је свака јединствено дефинисана својим одређеним JobParameters
који се користе за покретање серијског посла. Свако покретање JobInstance
означава се као JobExecution
. Сваки JobExecution
обично прати шта се догодило током трчања, као што су тренутни и излазни статуси, време почетка и завршетка итд.
А Step
је независна, специфична фаза серије Job
, таква да је свака Job
се састоји од једног или више Step
с. Слично а Job
, а Step
има појединца StepExecution
који представља један покушај извршавања Step
. StepExecution
чува информације о тренутном и излазном статусу, времену почетка и завршетка итд., као и референце на одговарајуће Step
и JobExecution
инстанци.
Ан ExecutionContext
је скуп парова кључ / вредност који садрже информације које се примењују на StepExecution
или JobExecution
. Спринг Батцх наставља ExecutionContext
, што помаже у случајевима када желите да поново покренете серију (нпр. Када се догоди фатална грешка итд.). Све што је потребно је ставити било који објекат који ће се делити између корака у контекст, а оквир ће се побринути за остало. Након поновног покретања, вредности из претходних ExecutionContext
се обнављају из базе података и примењују.
JobRepository
је механизам у Спринг Батцх-у који омогућава сву ову истрајност. Пружа ЦРУД операције за JobLauncher
, Job
и Step
инстанци. Једном а Job
је покренут, JobExecution
се добија из спремишта и током извршења StepExecution
и JobExecution
инстанце се задржавају у спремишту.
Једна од предности Спринг Батцх-а је та што су пројектне зависности минималне, што олакшава брзо покретање и покретање. Неколико зависности које постоје јасно су прецизиране и објашњене у pom.xml
пројекту, којем се може приступити овде .
Стварно покретање апликације се дешава у класи која изгледа отприлике овако:
@EnableBatchProcessing @SpringBootApplication public class BatchApplication { public static void main(String[] args) { prepareTestData(1000); SpringApplication.run(BatchApplication.class, args); } }
Тхе @EnableBatchProcessing
напомена омогућава функције Спринг Батцх-а и пружа основну конфигурацију за подешавање групних послова.
Тхе @SpringBootApplication
напомена потиче из Спринг Боот пројекат који нуди самосталне апликације за пролеће, спремне за производњу. Одређује класу конфигурације која декларише један или више Спринг пасуља, а такође покреће аутоматску конфигурацију и Спринг-ово скенирање компонената.
Наш пример узорка има само један посао који је конфигурисан са CustomerReportJobConfig
са убризганим JobBuilderFactory
и StepBuilderFactory
. Минимална конфигурација посла може се дефинисати у CustomerReportJobConfig
као што следи:
@Configuration public class CustomerReportJobConfig { @Autowired private JobBuilderFactory jobBuilders; @Autowired private StepBuilderFactory stepBuilders; @Bean public Job customerReportJob() { return jobBuilders.get('customerReportJob') .start(taskletStep()) .next(chunkStep()) .build(); } @Bean public Step taskletStep() { return stepBuilders.get('taskletStep') .tasklet(tasklet()) .build(); } @Bean public Tasklet tasklet() { return (contribution, chunkContext) -> { return RepeatStatus.FINISHED; }; } }
Постоје два главна приступа за изградњу степенице.
Један од приступа, као што је приказано у горњем примеру, је заснован на задацима . А Tasklet
подржава једноставан интерфејс који има само један метод, execute()
, који се позива више пута док се не врати RepeatStatus.FINISHED
или избаци изузетак да сигнализира неуспех. Сваки позив на Tasklet
је умотан у трансакцију.
Други приступ, обрада усмерена на комаде , односи се на секвенцијално читање података и стварање „делова“ који ће бити записани у границама трансакције. Свака појединачна ставка се чита из ItemReader
, предаје у ItemProcessor
и агрегира. Једном када је број прочитаних ставки једнак интервалу урезивања, цео део се записује преко ItemWriter
, а затим се трансакција извршава. Корак оријентисан на комаде може се конфигурисати на следећи начин:
@Bean public Job customerReportJob() { return jobBuilders.get('customerReportJob') .start(taskletStep()) .next(chunkStep()) .build(); } @Bean public Step chunkStep() { return stepBuilders.get('chunkStep') .chunk(20) .reader(reader()) .processor(processor()) .writer(writer()) .build(); }
Тхе chunk()
метода гради корак који обрађује ставке у комадима са предвиђеном величином, при чему се сваки део затим прослеђује наведеном читачу, процесору и писцу. О овим методама се детаљније говори у следећим одељцима овог чланка.
За нашу апликацију узорка Спринг Батцх, да бисмо прочитали листу купаца из КСМЛ датотеке, морамо да обезбедимо имплементацију интерфејса org.springframework.batch.item.ItemReader
:
public interface ItemReader { T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException; }
Ан ItemReader
пружа податке и очекује се да буде са статусом државе. Обично се позива више пута за сваку серију, са сваким позивом на read()
враћање следеће вредности и коначно враћање null
када су сви улазни подаци исцрпљени.
Спринг Батцх пружа неке готове имплементације ItemReader
, које се могу користити у разне сврхе, као што су читање колекција, датотека, интеграција ЈМС-а и ЈДБЦ-а, као и више извора, и тако даље.
У нашем примеру пријаве, CustomerItemReader
стварни делегати разреда read()
позива на лењо иницијализовану инстанцу IteratorItemReader
класа:
public class CustomerItemReader implements ItemReader { private final String filename; private ItemReader delegate; public CustomerItemReader(final String filename) { this.filename = filename; } @Override public Customer read() throws Exception { if (delegate == null) { delegate = new IteratorItemReader(customers()); } return delegate.read(); } private List customers() throws FileNotFoundException { try (XMLDecoder decoder = new XMLDecoder(new FileInputStream(filename))) { return (List) decoder.readObject(); } } }
Спринг беан за ову имплементацију креира се са @Component
и @StepScope
напомене, дајући Спрингу до знања да је ова класа компонента Спринг са опсегом и креираће се једном по извршењу корака, како следи:
@StepScope @Bean public ItemReader reader() { return new CustomerItemReader(XML_FILE); }
ItemProcessors
трансформишу улазне ставке и уводе пословну логику у сценариј обраде оријентисан на ставке. Морају да обезбеде имплементацију интерфејса org.springframework.batch.item.ItemProcessor
:
public interface ItemProcessor { O process(I item) throws Exception; }
Метода process()
прихвата један примерак I
класе и може или не мора да врати инстанцу истог типа. Повратак null
указује на то да предмет не би требало да се даље обрађује. Као и обично, Спринг нуди неколико стандардних процесора, као што је CompositeItemProcessor
који пролази кроз низ убризганих ItemProcessor
с и ValidatingItemProcessor
која потврђује унос.
У случају наше примере апликације, процесори се користе за филтрирање купаца према следећим захтевима:
Захтев за „текући месец“ се примењује путем прилагођеног ItemProcessor
:
public class BirthdayFilterProcessor implements ItemProcessor { @Override public Customer process(final Customer item) throws Exception { if (new GregorianCalendar().get(Calendar.MONTH) == item.getBirthday().get(Calendar.MONTH)) { return item; } return null; } }
Захтев за „ограничени број трансакција“ примењује се као ValidatingItemProcessor
:
public class TransactionValidatingProcessor extends ValidatingItemProcessor { public TransactionValidatingProcessor(final int limit) { super( item -> { if (item.getTransactions() >= limit) { throw new ValidationException('Customer has less than ' + limit + ' transactions'); } } ); setFilter(true); } }
Овај пар процесора се затим инкапсулира у CompositeItemProcessor
који имплементира образац делегата:
@StepScope @Bean public ItemProcessor processor() { final CompositeItemProcessor processor = new CompositeItemProcessor(); processor.setDelegates(Arrays.asList(new BirthdayFilterProcessor(), new TransactionValidatingProcessor(5))); return processor; }
За излаз података, Спринг Батцх пружа интерфејс org.springframework.batch.item.ItemWriter
за сериализацију објеката по потреби:
public interface ItemWriter { void write(List items) throws Exception; }
Тхе write()
метод је одговоран за осигурање да су сви интерни бафери испрани. Ако је трансакција активна, обично ће бити потребно одбацити излаз при следећем враћању. Ресурс коме писац шаље податке обично би требао сам да то реши. Постоје стандардне имплементације као што су CompositeItemWriter
, JdbcBatchItemWriter
, JmsItemWriter
, JpaItemWriter
, SimpleMailMessageItemWriter
и друге.
У нашем узорку пријаве, списак филтрираних купаца написан је на следећи начин:
public class CustomerItemWriter implements ItemWriter, Closeable { private final PrintWriter writer; public CustomerItemWriter() { OutputStream out; try { out = new FileOutputStream('output.txt'); } catch (FileNotFoundException e) { out = System.out; } this.writer = new PrintWriter(out); } @Override public void write(final List items) throws Exception { for (Customer item : items) { writer.println(item.toString()); } } @PreDestroy @Override public void close() throws IOException { writer.close(); } }
Подразумевано, Спринг Батцх извршава све послове које може да пронађе (тј. Који су конфигурисани као у CustomerReportJobConfig
) при покретању. Да бисте променили ово понашање, онемогућите извршавање посла приликом покретања додавањем следећег својства у application.properties
:
spring.batch.job.enabled=false
Стварно заказивање се тада постиже додавањем @EnableScheduling
напомена класи конфигурације и @Scheduled
напомена за методу која извршава сам посао. Заказивање се може конфигурисати са кашњењем, брзинама или црон изразима:
// run every 5000 msec (i.e., every 5 secs) @Scheduled(fixedRate = 5000) public void run() throws Exception { JobExecution execution = jobLauncher.run( customerReportJob(), new JobParametersBuilder().toJobParameters() ); }
Постоји проблем са горњим примером. У време извођења посао ће успети само први пут. Када се покрене други пут (тј. Након пет секунди), генерисаће следеће поруке у евиденцијама (имајте на уму да би у претходним верзијама Спринг Батцх-а био бачен JobInstanceAlreadyCompleteException
):
INFO 36988 --- [pool-2-thread-1] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=customerReportJob]] launched with the following parameters: [{}] INFO 36988 --- [pool-2-thread-1] o.s.batch.core.job.SimpleStepHandler : Step already complete or not restartable, so no action to execute: StepExecution: id=1, version=3, name=taskletStep, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription= INFO 36988 --- [pool-2-thread-1] o.s.batch.core.job.SimpleStepHandler : Step already complete or not restartable, so no action to execute: StepExecution: id=2, version=53, name=chunkStep, status=COMPLETED, exitStatus=COMPLETED, readCount=1000, filterCount=982, writeCount=18 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=51, rollbackCount=0, exitDescription=
То се дешава зато што се могу креирати и извршити само јединствени JobInstance
и Спринг Батцх не може разликовати прву и другу JobInstance
.
Постоје два начина за избегавање овог проблема када заказујете серијски посао.
Једно је да у сваки посао унесете један или више јединствених параметара (нпр. Стварно време почетка у наносекундама):
@Scheduled(fixedRate = 5000) public void run() throws Exception { jobLauncher.run( customerReportJob(), new JobParametersBuilder().addLong('uniqueness', System.nanoTime()).toJobParameters() ); }
Следећи посао можете покренути у низу од JobInstance
с одређених JobParametersIncrementer
у прилогу наведеног посла са SimpleJobOperator.startNextInstance()
:
@Autowired private JobOperator operator; @Autowired private JobExplorer jobs; @Scheduled(fixedRate = 5000) public void run() throws Exception { List lastInstances = jobs.getJobInstances(JOB_NAME, 0, 1); if (lastInstances.isEmpty()) { jobLauncher.run(customerReportJob(), new JobParameters()); } else { operator.startNextInstance(JOB_NAME); } }
Обично, за покретање јединичних тестова у апликацији Спринг Боот, оквир мора учитати одговарајући ApplicationContext
. У ту сврху се користе две напомене:
@RunWith(SpringRunner.class) @ContextConfiguration(classes = {...})
Постоји класа корисности org.springframework.batch.test.JobLauncherTestUtils
за тестирање серијских послова. Пружа методе за покретање читавог посла, као и омогућавање тестирања појединачних корака од краја до краја, без потребе за извођењем сваког корака у послу. Мора се прогласити као пролећни пасуљ:
@Configuration public class BatchTestConfiguration { @Bean public JobLauncherTestUtils jobLauncherTestUtils() { return new JobLauncherTestUtils(); } }
Типичан тест за посао и корак изгледа на следећи начин (и такође може да користи било који подсмешни оквир):
@RunWith(SpringRunner.class) @ContextConfiguration(classes = {BatchApplication.class, BatchTestConfiguration.class}) public class CustomerReportJobConfigTest { @Autowired private JobLauncherTestUtils testUtils; @Autowired private CustomerReportJobConfig config; @Test public void testEntireJob() throws Exception { final JobExecution result = testUtils.getJobLauncher().run(config.customerReportJob(), testUtils.getUniqueJobParameters()); Assert.assertNotNull(result); Assert.assertEquals(BatchStatus.COMPLETED, result.getStatus()); } @Test public void testSpecificStep() { Assert.assertEquals(BatchStatus.COMPLETED, testUtils.launchStep('taskletStep').getStatus()); } }
Спринг Батцх уводи додатне циљеве за контекст корака и посла. Објекти у овим опсезима користе контејнер Спринг као фабрику објеката, тако да постоји само једна инстанца сваког таквог граха по кораку извршења или послу. Поред тога, пружа се подршка за касно везивање референци доступних са StepContext
или JobContext
. Компоненте које су у току извођења конфигурисане за опсег корака или посла је незгодно тестирати као самосталне компоненте, осим ако немате начин да поставите контекст као да су у кораку или извршењу посла. То је циљ org.springframework.batch.test.StepScopeTestExecutionListener
и org.springframework.batch.test.StepScopeTestUtils
компоненте у Спринг Батцх-у, као и JobScopeTestExecutionListener
и JobScopeTestUtils
.
Тхе TestExecutionListeners
су декларисани на нивоу класе, а његов посао је створити контекст извршења корака за сваку тест методу. На пример:
@RunWith(SpringRunner.class) @TestExecutionListeners({DependencyInjectionTestExecutionListener.class, StepScopeTestExecutionListener.class}) @ContextConfiguration(classes = {BatchApplication.class, BatchTestConfiguration.class}) public class BirthdayFilterProcessorTest { @Autowired private BirthdayFilterProcessor processor; public StepExecution getStepExecution() { return MetaDataInstanceFactory.createStepExecution(); } @Test public void filter() throws Exception { final Customer customer = new Customer(); customer.setId(1); customer.setName('name'); customer.setBirthday(new GregorianCalendar()); Assert.assertNotNull(processor.process(customer)); } }
Постоје два TestExecutionListener
с. Један је из редовног Спринг Тест оквира и управља убризгавањем зависности из конфигурисаног контекста апликације. Друга је Спринг Батцх StepScopeTestExecutionListener
који поставља контекст опсега корака за убризгавање зависности у јединствене тестове. А StepContext
креира се током трајања методе испитивања и ставља на располагање свим зависностима које се убризгавају. Подразумевано понашање је само стварање StepExecution
са фиксним својствима. Алтернативно, StepContext
тест може пружити као фабричка метода која враћа тачан тип.
Други приступ заснован је на StepScopeTestUtils
класа корисности. Ова класа се користи за креирање и манипулисање StepScope
у јединственим тестовима на флексибилнији начин без употребе ињекције зависности. На пример, читање ИД-а купца који је филтрирао горњи процесор може се извршити на следећи начин:
@Test public void filterId() throws Exception { final Customer customer = new Customer(); customer.setId(1); customer.setName('name'); customer.setBirthday(new GregorianCalendar()); final int id = StepScopeTestUtils.doInStepScope( getStepExecution(), () -> processor.process(customer).getId() ); Assert.assertEquals(1, id); }
Овај чланак представља неке од основа дизајна и развоја Спринг Батцх апликација. Међутим, постоји много напреднијих тема и могућности - попут скалирања, паралелне обраде, слушалаца и још много тога - о којима се у овом чланку не говори. Надам се да овај чланак пружа корисну основу за почетак.
Информације о овим напреднијим темама можете затим пронаћи у званична Спринг Бацк документација за Спринг Батцх.
Спринг Батцх је лаган, свеобухватан оквир дизајниран да олакша развој робусних батцх апликација. Такође пружа напредније техничке услуге и функције које подржавају серијске послове изузетно великог обима и високих перформанси кроз своје технике оптимизације и партиционирања.
„Корак“ је независна, специфична фаза серијског „посла“, тако да се сваки посао састоји од једног или више корака.
'ЈобРепоситори' је механизам у Спринг Батцх-у који омогућава сву ову постојаност. Пружа ЦРУД операције за инсталације ЈобЛаунцхер, Јоб и Степ.
Један од приступа заснован је на Тасклет-у, где Тасклет подржава једноставан интерфејс са једном извршном () методом. Други приступ, ** обрада усмерена на комаде **, односи се на секвенцијално читање података и стварање „делова“ који ће бити записани у границама трансакције.