Партидната обработка - типично за групово ориентирано, неинтерактивно и често продължително фоново изпълнение - се използва широко във почти всяка индустрия и се прилага за разнообразен набор от задачи. Партидната обработка може да бъде интензивна за данни или изчислителна, да се изпълнява последователно или паралелно и може да бъде инициирана чрез различни модели за извикване, включително ad hoc, планирани и при поискване.
Този урок Spring Batch обяснява модела на програмиране и езика на домейна на пакетните приложения като цяло и по-специално показва някои полезни подходи за проектирането и разработването на пакетни приложения, използвайки текущата Пролетна партида 3.0.7 версия.
Какво е Spring Batch?
Spring Batch е лека, всеобхватна рамка, предназначена да улесни разработването на стабилни партидни приложения. Той също така предоставя по-усъвършенствани технически услуги и функции, които поддържат изключително голям обем и високоефективни партидни задачи чрез своите техники за оптимизация и разделяне. Spring Batch се основава на Базиран на POJO подход за развитие на Пролетна рамка , познат на всички опитни пролетни разработчици .
Като пример, тази статия разглежда изходния код от примерен проект, който зарежда XML-форматиран клиентски файл, филтрира клиенти по различни атрибути и извежда филтрираните записи в текстов файл. Изходният код за нашия пример Spring Batch (който използва Ломбок анотации) е налице тук на GitHub и изисква Java SE 8 и Maven.
Важно е всеки разработчик на партиди да е запознат и да се чувства добре с основните концепции за партидна обработка. Диаграмата по-долу е опростена версия на пакетната референтна архитектура, която е доказана чрез десетилетия на внедряване на много различни платформи. Той представя ключовите понятия и термини, свързани с партидната обработка, използвани от Spring Batch.
Както е показано в нашия пример за групова обработка, пакетният процес обикновено се капсулира от Job
състоящ се от множество Step
s. Всеки Step
обикновено има единични ItemReader
, ItemProcessor
и ItemWriter
. A Job
се изпълнява от JobLauncher
, а метаданните за конфигурирани и изпълнени задачи се съхраняват в JobRepository
.
Всеки Job
могат да бъдат свързани с множество JobInstance
s, всяка от които е дефинирана уникално от конкретния си JobParameters
които се използват за стартиране на партидна работа. Всяко изпълнение на JobInstance
се означава като JobExecution
. Всеки JobExecution
обикновено проследява какво се е случило по време на изпълнение, като текущ и изходен статус, начално и крайно време и т.н.
A Step
е независима, специфична фаза на партида Job
, така че всяка Job
се състои от един или повече Step
s. Подобно на Job
, a Step
има физическо лице StepExecution
което представлява единичен опит за изпълнение на Step
. StepExecution
съхранява информацията за текущите и изходните състояния, началните и крайните часове и т.н., както и препратки към съответните му Step
и JobExecution
инстанции.
An ExecutionContext
е набор от двойки ключ-стойност, съдържащи информация, която се обхваща или StepExecution
или JobExecution
. Spring Batch продължава ExecutionContext
, което помага в случаите, когато искате да рестартирате пакетно изпълнение (например, когато е възникнала фатална грешка и т.н.). Всичко, което е необходимо, е да поставите всеки обект, който да бъде споделен между стъпките, в контекста, а рамката ще се погрижи за останалото. След рестартиране стойностите от предишните ExecutionContext
се възстановяват от базата данни и се прилагат.
JobRepository
е механизмът в Spring Batch, който прави възможна цялата тази упоритост. Той осигурява CRUD операции за JobLauncher
, Job
и Step
екземпляри. Веднъж a Job
стартира, JobExecution
се получава от хранилището и по време на изпълнението StepExecution
и JobExecution
екземплярите се запазват в хранилището.
Едно от предимствата на Spring Batch е, че зависимостите от проекти са минимални, което улеснява бързото стартиране и работа. Малкото зависимости, които съществуват, са ясно посочени и обяснени в pom.xml
на проекта, които могат да бъдат достъпни тук .
Реалното стартиране на приложението се случва в клас, изглеждащ по следния начин:
@EnableBatchProcessing @SpringBootApplication public class BatchApplication { public static void main(String[] args) { prepareTestData(1000); SpringApplication.run(BatchApplication.class, args); } }
@EnableBatchProcessing
анотацията дава възможност за Spring Batch функции и предоставя основна конфигурация за настройка на групови задачи.
@SpringBootApplication
анотацията идва от Пролетно зареждане проект, който осигурява самостоятелни, готови за производство, базирани на пролетта приложения. Той определя клас на конфигурация, който декларира един или повече Spring компоненти и също така задейства автоматично конфигуриране и сканиране на компоненти на Spring.
Нашият примерен проект има само една работа, която е конфигурирана от CustomerReportJobConfig
с инжектиран JobBuilderFactory
и StepBuilderFactory
. Минималната конфигурация на заданието може да бъде дефинирана в CustomerReportJobConfig
както следва:
@Configuration public class CustomerReportJobConfig { @Autowired private JobBuilderFactory jobBuilders; @Autowired private StepBuilderFactory stepBuilders; @Bean public Job customerReportJob() { return jobBuilders.get('customerReportJob') .start(taskletStep()) .next(chunkStep()) .build(); } @Bean public Step taskletStep() { return stepBuilders.get('taskletStep') .tasklet(tasklet()) .build(); } @Bean public Tasklet tasklet() { return (contribution, chunkContext) -> { return RepeatStatus.FINISHED; }; } }
Има два основни подхода за изграждане на стъпка.
Един от подходите, както е показано в горния пример, е базирани на задачи . A Tasklet
поддържа прост интерфейс, който има само един метод, execute()
, който се извиква многократно, докато или се върне RepeatStatus.FINISHED
или хвърля изключение, за да сигнализира за повреда. Всяко обаждане до Tasklet
е обвит в транзакция.
Друг подход, ориентирана към парчета обработка , се отнася до последователно четене на данните и създаване на „парчета“, които ще бъдат записани в границите на транзакцията. Всеки отделен елемент се чете от ItemReader
, предава се на ItemProcessor
и се обобщава. След като броят на прочетените елементи е равен на интервала на фиксиране, целият блок се записва чрез ItemWriter
и след това транзакцията се фиксира. Ориентирана към парчета стъпка може да бъде конфигурирана както следва:
@Bean public Job customerReportJob() { return jobBuilders.get('customerReportJob') .start(taskletStep()) .next(chunkStep()) .build(); } @Bean public Step chunkStep() { return stepBuilders.get('chunkStep') .chunk(20) .reader(reader()) .processor(processor()) .writer(writer()) .build(); }
chunk()
метод изгражда стъпка, която обработва елементи на парчета с предоставения размер, като всеки парче след това се предава на посочения четец, процесор и писател. Тези методи са разгледани по-подробно в следващите раздели на тази статия.
За нашето примерно приложение Spring Batch, за да прочетем списък с клиенти от XML файл, трябва да предоставим изпълнение на интерфейса org.springframework.batch.item.ItemReader
:
public interface ItemReader { T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException; }
An ItemReader
предоставя данните и се очаква да бъде с държавно състояние. Обикновено се извиква няколко пъти за всяка партида, като всяко повикване е read()
връщане на следващата стойност и накрая връщане null
когато всички изходни данни са изчерпани.
Spring Batch предоставя някои готови реализации на ItemReader
, които могат да се използват за различни цели като четене на колекции, файлове, интегриране на JMS и JDBC, както и множество източници и т.н.
В нашата примерна заявка, CustomerItemReader
делегати от класа действително read()
извиква лениво инициализиран екземпляр на IteratorItemReader
клас:
public class CustomerItemReader implements ItemReader { private final String filename; private ItemReader delegate; public CustomerItemReader(final String filename) { this.filename = filename; } @Override public Customer read() throws Exception { if (delegate == null) { delegate = new IteratorItemReader(customers()); } return delegate.read(); } private List customers() throws FileNotFoundException { try (XMLDecoder decoder = new XMLDecoder(new FileInputStream(filename))) { return (List) decoder.readObject(); } } }
Spring bean за това изпълнение се създава с @Component
и @StepScope
анотации, давайки на Spring да разбере, че този клас е компонент Spring с обхват и ще бъде създаден веднъж на изпълнение на стъпка, както следва:
@StepScope @Bean public ItemReader reader() { return new CustomerItemReader(XML_FILE); }
ItemProcessors
трансформира входните елементи и въведе бизнес логиката в сценарий за обработка, ориентиран към артикули. Те трябва да осигурят изпълнение на интерфейса org.springframework.batch.item.ItemProcessor
:
public interface ItemProcessor { O process(I item) throws Exception; }
Методът process()
приема един екземпляр на I
клас и може или не може да върне екземпляр от същия тип. Връща се null
показва, че артикулът не трябва да продължава да се обработва. Както обикновено, Spring предлага няколко стандартни процесора, като CompositeItemProcessor
който предава елемента през последователност от инжектирани ItemProcessor
s и ValidatingItemProcessor
което валидира въведеното.
В случая с нашето примерно приложение, процесорите се използват за филтриране на клиентите по следните изисквания:
Изискването за „текущ месец“ се изпълнява чрез персонализирано ItemProcessor
:
public class BirthdayFilterProcessor implements ItemProcessor { @Override public Customer process(final Customer item) throws Exception { if (new GregorianCalendar().get(Calendar.MONTH) == item.getBirthday().get(Calendar.MONTH)) { return item; } return null; } }
Изискването за „ограничен брой транзакции“ се прилага като ValidatingItemProcessor
:
public class TransactionValidatingProcessor extends ValidatingItemProcessor { public TransactionValidatingProcessor(final int limit) { super( item -> { if (item.getTransactions() >= limit) { throw new ValidationException('Customer has less than ' + limit + ' transactions'); } } ); setFilter(true); } }
След това тази двойка процесори се капсулира в рамките на CompositeItemProcessor
който реализира модела на делегат:
@StepScope @Bean public ItemProcessor processor() { final CompositeItemProcessor processor = new CompositeItemProcessor(); processor.setDelegates(Arrays.asList(new BirthdayFilterProcessor(), new TransactionValidatingProcessor(5))); return processor; }
За извеждане на данните Spring Batch предоставя интерфейс org.springframework.batch.item.ItemWriter
за сериализиране на обекти, ако е необходимо:
public interface ItemWriter { void write(List items) throws Exception; }
write()
метод е отговорен за гарантиране, че всички вътрешни буфери са измити. Ако транзакцията е активна, обикновено ще е необходимо изхвърлянето на изхода при последващо връщане назад. Ресурсът, към който пишещият изпраща данни, обикновено трябва да може да се справи сам с това. Има стандартни изпълнения като CompositeItemWriter
, JdbcBatchItemWriter
, JmsItemWriter
, JpaItemWriter
, SimpleMailMessageItemWriter
и други.
В нашето примерно приложение списъкът с филтрирани клиенти се изписва, както следва:
public class CustomerItemWriter implements ItemWriter, Closeable { private final PrintWriter writer; public CustomerItemWriter() { OutputStream out; try { out = new FileOutputStream('output.txt'); } catch (FileNotFoundException e) { out = System.out; } this.writer = new PrintWriter(out); } @Override public void write(final List items) throws Exception { for (Customer item : items) { writer.println(item.toString()); } } @PreDestroy @Override public void close() throws IOException { writer.close(); } }
По подразбиране Spring Batch изпълнява всички задачи, които може да намери (т.е. които са конфигурирани както в CustomerReportJobConfig
) при стартиране. За да промените това поведение, деактивирайте изпълнението на заданието при стартиране, като добавите следното свойство към application.properties
:
spring.batch.job.enabled=false
Тогава действителното планиране се постига чрез добавяне на @EnableScheduling
анотация към конфигурационен клас и @Scheduled
анотация към метода, който изпълнява самата работа. Графикът може да бъде конфигуриран със закъснение, скорост или cron изрази:
// run every 5000 msec (i.e., every 5 secs) @Scheduled(fixedRate = 5000) public void run() throws Exception { JobExecution execution = jobLauncher.run( customerReportJob(), new JobParametersBuilder().toJobParameters() ); }
Има проблем с горния пример обаче. По време на изпълнение работата ще успее само за първи път. Когато стартира за втори път (т.е. след пет секунди), той ще генерира следните съобщения в дневниците (имайте предвид, че в предишните версии на Spring Batch | | + + _ | би било хвърлено):
JobInstanceAlreadyCompleteException
Това се случва, защото могат да бъдат създадени и изпълнени само уникални INFO 36988 --- [pool-2-thread-1] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=customerReportJob]] launched with the following parameters: [{}] INFO 36988 --- [pool-2-thread-1] o.s.batch.core.job.SimpleStepHandler : Step already complete or not restartable, so no action to execute: StepExecution: id=1, version=3, name=taskletStep, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription= INFO 36988 --- [pool-2-thread-1] o.s.batch.core.job.SimpleStepHandler : Step already complete or not restartable, so no action to execute: StepExecution: id=2, version=53, name=chunkStep, status=COMPLETED, exitStatus=COMPLETED, readCount=1000, filterCount=982, writeCount=18 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=51, rollbackCount=0, exitDescription=
и Spring Batch няма начин да прави разлика между първата и втората JobInstance
.
Има два начина за избягване на този проблем, когато планирате пакетна работа.
Едно е да сте сигурни, че въвеждате един или повече уникални параметри (например действителното начално време в наносекунди) за всяка работа:
JobInstance
Като алтернатива можете да стартирате следващото задание в последователност от @Scheduled(fixedRate = 5000) public void run() throws Exception { jobLauncher.run( customerReportJob(), new JobParametersBuilder().addLong('uniqueness', System.nanoTime()).toJobParameters() ); }
s, определена от JobInstance
прикачен към посочената работа с JobParametersIncrementer
:
SimpleJobOperator.startNextInstance()
Обикновено, за да стартирате модулни тестове в приложение за Spring Boot, рамката трябва да зареди съответната @Autowired private JobOperator operator; @Autowired private JobExplorer jobs; @Scheduled(fixedRate = 5000) public void run() throws Exception { List lastInstances = jobs.getJobInstances(JOB_NAME, 0, 1); if (lastInstances.isEmpty()) { jobLauncher.run(customerReportJob(), new JobParameters()); } else { operator.startNextInstance(JOB_NAME); } }
. За тази цел се използват две анотации:
ApplicationContext
Има полезност клас @RunWith(SpringRunner.class) @ContextConfiguration(classes = {...})
за тестване на партидни задачи. Той предоставя методи за стартиране на цяла работа, както и позволява тестване на отделни стъпки от край до край, без да се налага да изпълнявате всяка стъпка в заданието. Той трябва да бъде деклариран като пролетен боб:
org.springframework.batch.test.JobLauncherTestUtils
Типичен тест за работа и стъпка изглежда по следния начин (и може да използва и всякакви подигравателни рамки):
@Configuration public class BatchTestConfiguration { @Bean public JobLauncherTestUtils jobLauncherTestUtils() { return new JobLauncherTestUtils(); } }
Spring Batch въвежда допълнителни обхвати за контекст на стъпка и работа. Обектите в тези обхвати използват контейнера Spring като фабрика на обекти, така че има само един екземпляр на всеки такъв компонент за стъпка или задача за изпълнение. Освен това се предоставя подкрепа за късно свързване на референции, достъпни от @RunWith(SpringRunner.class) @ContextConfiguration(classes = {BatchApplication.class, BatchTestConfiguration.class}) public class CustomerReportJobConfigTest { @Autowired private JobLauncherTestUtils testUtils; @Autowired private CustomerReportJobConfig config; @Test public void testEntireJob() throws Exception { final JobExecution result = testUtils.getJobLauncher().run(config.customerReportJob(), testUtils.getUniqueJobParameters()); Assert.assertNotNull(result); Assert.assertEquals(BatchStatus.COMPLETED, result.getStatus()); } @Test public void testSpecificStep() { Assert.assertEquals(BatchStatus.COMPLETED, testUtils.launchStep('taskletStep').getStatus()); } }
или StepContext
. Компонентите, които са конфигурирани по време на изпълнение да бъдат с обхват на стъпка или работа, са трудни за тестване като самостоятелни компоненти, освен ако нямате начин да зададете контекста, сякаш са в изпълнение на стъпка или работа. Това е целта на JobContext
и org.springframework.batch.test.StepScopeTestExecutionListener
компоненти в Spring Batch, както и org.springframework.batch.test.StepScopeTestUtils
и JobScopeTestExecutionListener
.
JobScopeTestUtils
са декларирани на ниво клас и неговата работа е да създаде контекст за изпълнение на стъпка за всеки метод на тестване. Например:
TestExecutionListeners
Има две @RunWith(SpringRunner.class) @TestExecutionListeners({DependencyInjectionTestExecutionListener.class, StepScopeTestExecutionListener.class}) @ContextConfiguration(classes = {BatchApplication.class, BatchTestConfiguration.class}) public class BirthdayFilterProcessorTest { @Autowired private BirthdayFilterProcessor processor; public StepExecution getStepExecution() { return MetaDataInstanceFactory.createStepExecution(); } @Test public void filter() throws Exception { final Customer customer = new Customer(); customer.setId(1); customer.setName('name'); customer.setBirthday(new GregorianCalendar()); Assert.assertNotNull(processor.process(customer)); } }
s. Единият е от обикновената рамка Spring Test и обработва инжектирането на зависимости от конфигурирания контекст на приложението. Другото е Spring Batch TestExecutionListener
който задава контекст на стъпка-обхват за инжектиране на зависимост в модулни тестове. A StepScopeTestExecutionListener
се създава по време на тестовия метод и се предоставя на всички инжекции, които се инжектират. Поведението по подразбиране е просто да се създаде StepContext
с фиксирани свойства. Алтернативно, StepExecution
може да се предостави от тестовия случай като фабричен метод, връщащ правилния тип.
Друг подход се основава на StepContext
клас на полезност. Този клас се използва за създаване и манипулиране StepScopeTestUtils
в модулни тестове по по-гъвкав начин, без да се използва инжектиране на зависимост. Например четенето на идентификатора на клиента, филтрирано от процесора по-горе, може да се извърши по следния начин:
StepScope
Тази статия представя някои от основите на проектирането и разработването на Spring Batch приложения. Има обаче много по-напреднали теми и възможности - като мащабиране, паралелна обработка, слушатели и много други, които не са разгледани в тази статия. Надяваме се, че тази статия предоставя полезна основа за започване.
След това информация за тези по-напреднали теми може да бъде намерена в официална документация Spring Back за Пролетна партида.
Spring Batch е лека, всеобхватна рамка, предназначена да улесни разработването на стабилни партидни приложения. Той също така предоставя по-усъвършенствани технически услуги и функции, които поддържат изключително голям обем и високоефективни партидни задачи чрез своите техники за оптимизация и разделяне.
„Стъпка“ е независима, специфична фаза на партида „Задача“, така че всяка работа се състои от една или повече стъпки.
„JobRepository“ е механизмът в Spring Batch, който прави възможно всичко това постоянство. Той осигурява CRUD операции за екземпляри на JobLauncher, Job и Step.
Един от подходите е базиран на Tasklet, където Tasklet поддържа прост интерфейс с един метод за изпълнение (). Другият подход, ** обработка, ориентирана към парчета **, се отнася до последователно четене на данните и създаване на „парчета“, които ще бъдат записани в границите на транзакцията.