Saturday, 8 April 2017

How to write Spring Batch

Create a  xml such as scheduler-batch-job-context.xml with the content as like below

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

 xmlns:batch="http://www.springframework.org/schema/batch"

 xmlns:p="http://www.springframework.org/schema/p"

 xsi:schemaLocation="http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-3.0.xsd

  http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd">



 <batch:job id="schedulerJob">

  <batch:step id="scheduleStep">

   <batch:tasklet transaction-manager="jobTransactionManager" >

    <batch:chunk

      reader="schedulerReader"

      processor="schedulerProcessor"

      writer="schedulerWriter"

      commit-interval="#{jobParameters['commit.interval']}">

    </batch:chunk>  

    <batch:listeners>

     <batch:listener ref="errorListener" />

    </batch:listeners>

   </batch:tasklet>

  </batch:step>

 </batch:job>

 <!-- *** READING *** -->

 <bean id="schedulerReader"

   class="com.reader.SchedulerReader"

   init-method="init"

   scope="step">

  <property name="service" ref="service" />

    </bean>

 <!-- *** PROCESSING *** -->

 <bean id="schedulerProcessor"

   class="com.processor.SchedulerProcessor"

   scope="step"/>


 <!-- *** WRITING *** -->

 <bean id="schedulerWriter"

          class="org.springframework.batch.item.support.CompositeItemWriter">

          <property name="delegates">

                 <list>

                        <ref bean="schedulerLogWriter"/>

                        <ref bean="schedulerUpdateWriter"/>

                 </list>

          </property>

    </bean>



 <bean id="schedulerLogWriter"

          class="org.springframework.batch.item.adapter.ItemWriterAdapter"

          scope="step">

     <property name="targetObject" ref="service" />

        <property name="targetMethod" value="logChange" />

   </bean>



 <bean id="schedulerUpdateWriter"

          class="org.springframework.batch.item.adapter.ItemWriterAdapter"

          scope="step">

     <property name="targetObject" ref="service" />

        <property name="targetMethod" value="updateSchedulerStatus" />

   </bean>

</beans>



Create an xml such as application-context.xml with the content as below

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

 xmlns:context="http://www.springframework.org/schema/context"

 xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd

  http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd">

 <context:property-placeholder ignore-unresolvable="true"

  location="file:///${batch.home}/card-support-batches.properties" />

 <context:component-scan base-package="com" />

 <bean id="jobDataSource"

  class="org.springframework.jdbc.datasource.DriverManagerDataSource">

  <property name="driverClassName" value="${batch.data.database.driverClassName}" />

  <property name="url" value="${batch.data.database.url}" />

  <property name="username" value="${batch.data.database.username}" />

  <property name="password" value="${batch.data.database.password}" />

 </bean>


 <bean id="jobTransactionManager"

  class="org.springframework.jdbc.datasource.DataSourceTransactionManager">

  <property name="dataSource" ref="jobDataSource"/>

 </bean>

 <bean id="jdbcTemplate"

   class="org.springframework.jdbc.core.JdbcTemplate">

  <constructor-arg ref="jobDataSource" />

 </bean>

</beans>



Create an xml such as validator-batch-job-context.xml with the content as below


<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

 xmlns:batch="http://www.springframework.org/schema/batch"

 xmlns:p="http://www.springframework.org/schema/p"

 xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd

  http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-3.0.xsd">



 <batch:job id="validatorJob">

  <batch:step id="validateValue">

   <batch:tasklet transaction-manager="jobTransactionManager" start-limit="10" >

    <batch:chunk

      reader="validatorReader"

      processor="validateProcessor"

      writer="compositeWriter"

      commit-interval="#{jobParameters['commit.interval']}">

    </batch:chunk>  

    <batch:listeners>

     <batch:listener ref="errorListener" />

    </batch:listeners>

   </batch:tasklet>

  </batch:step>

 </batch:job>

 <!-- *** READING *** -->

 <bean id="validatorReader"

   class="com.reader.validatorReader"

   init-method="init"

   scope="step">

  <property name="service" ref="service" />

    </bean>


 <!-- *** PROCESSING *** -->

 <bean id="validateProcessor"

   class="com.processor.validatorProcessor"

   scope="step"/>


 <!-- *** WRITING *** -->

    <bean id="compositeWriter"

          class="org.springframework.batch.item.support.CompositeItemWriter">

          <property name="delegates">

                 <list>

                        <ref bean="validatorLogWriter"/>

                 </list>

          </property>

    </bean>



 <bean id="validatorLogWriter"

          class="org.springframework.batch.item.adapter.ItemWriterAdapter"

          scope="step">

     <property name="targetObject" ref="service" />

        <property name="targetMethod" value="logChange" />

   </bean>

   <!--

 <bean id="validatorUpdateWriter"

          class="org.springframework.batch.item.adapter.ItemWriterAdapter"

          scope="step">

     <property name="targetObject" ref="service" />

        <property name="targetMethod" value="logChange" />

   </bean>

    -->

</beans>


create a xml such as launch-context.xml with the content as like below


<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

 xmlns:batch="http://www.springframework.org/schema/batch"

 xmlns:context="http://www.springframework.org/schema/context"

 xmlns:jdbc="http://www.springframework.org/schema/jdbc"

 xmlns:task="http://www.springframework.org/schema/task"

 xsi:schemaLocation="http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-4.0.xsd

  http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-4.0.xsd

  http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd

  http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-3.0.xsd

  http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd">


 <bean class="org.springframework.batch.core.scope.StepScope">

  <property name="proxyTargetClass" value="true"/>

 </bean>


 <bean id="jobControlDataSource"

  class="org.springframework.jdbc.datasource.DriverManagerDataSource">

  <property name="driverClassName" value="${batch.control.database.driverClassName}" />

  <property name="url" value="${batch.control.database.url}" />

  <property name="username" value="${batch.control.database.username}" />

  <property name="password" value="${batch.control.database.password}" />

 </bean>

 <bean id="jobControlTransactionManager"

  class="org.springframework.jdbc.datasource.DataSourceTransactionManager">

  <property name="dataSource" ref="jobControlDataSource"/>

 </bean>

 <batch:job-repository id="jobRepository"

  data-source="jobControlDataSource"

  transaction-manager="jobControlTransactionManager"

  isolation-level-for-create="DEFAULT" />

 <bean id="jobLauncher"

  class="org.springframework.batch.core.launch.support.SimpleJobLauncher">

  <property name="jobRepository" ref="jobRepository" />

 </bean>

</beans>

Create a error-listener class as like below

---------------

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.batch.core.listener.ItemListenerSupport;

import org.springframework.context.annotation.Scope;

import org.springframework.stereotype.Component;

@Component

@Scope(value="step")

public class ErrorListener extends ItemListenerSupport<Object, Object> {

 private static final Logger LOGGER = LoggerFactory.getLogger(ErrorListener.class);

    public void onReadError(Exception ex) {

     LOGGER.error("Encountered error on read", ex);

    }

    public void onWriteError(Exception ex, Object item) {

     LOGGER.error("Encountered error on write", ex);

    }

}

Create a java main class which in turn invokes the xml files and launches the scheduler job


import java.io.FileInputStream;

import java.io.IOException;

import java.io.InputStream;

import java.util.Date;

import java.util.Properties;

import org.springframework.batch.core.ExitStatus;

import org.springframework.batch.core.Job;

import org.springframework.batch.core.JobExecution;

import org.springframework.batch.core.JobParameters;

import org.springframework.batch.core.JobParametersBuilder;

import org.springframework.batch.core.JobParametersInvalidException;

import org.springframework.batch.core.launch.JobLauncher;

import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;

import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;

import org.springframework.batch.core.repository.JobRestartException;

import org.springframework.context.ApplicationContext;

import org.springframework.context.support.ClassPathXmlApplicationContext;

import org.springframework.context.support.AbstractApplicationContext;

public class JobRunnerScheduler {

 public static void main(String... args) {

  /*

  if(args.length != 1) {

    System.out.println("Enter input parameter - source filename without path.");

    System.exit(1);

  }

  */



  ApplicationContext context = null;

  Job job = null;



  //String sourceFileName = args[0];



  context = new ClassPathXmlApplicationContext(

    "classpath*:META-INF/spring/applicationContext.xml",

    "classpath:META-INF/spring/batch/launch-context.xml",

    "classpath:META-INF/spring/batch/scheduler-batch-job-context.xml");



  ((AbstractApplicationContext)context).registerShutdownHook();



  job = (Job) context.getBean("schedulerJob");



  JobLauncher jobLauncher = context.getBean(JobLauncher.class);



  JobParameters jobParams = createJobParameters(null);



  JobExecution jobExec = null;

  try {

   jobExec = jobLauncher.run(job, jobParams);

  } catch (JobExecutionAlreadyRunningException e) {

   e.printStackTrace();

  } catch (JobRestartException e) {

   e.printStackTrace();

  } catch (JobInstanceAlreadyCompleteException e) {

   e.printStackTrace();

  } catch (JobParametersInvalidException e) {

   e.printStackTrace();

  }



  if ( (jobExec.getExitStatus().getExitCode().equals(ExitStatus.COMPLETED.getExitCode()))

  || (jobExec.getExitStatus().getExitCode().equals(ExitStatus.NOOP.getExitCode())) ) {

   System.exit(0);

  } else {

   System.exit(2);

  }

 }

 private static JobParameters createJobParameters(String sourceFile) {

  String commitInterval = null;

  Properties prop = new Properties();

  InputStream input = null;

  try {

   input = new FileInputStream(System.getProperty("batch.home")

     + "/batches.properties");

   prop.load(input);

   commitInterval = prop.getProperty("jobParam.commit.interval");

  } catch (IOException ex) {

   ex.printStackTrace();

  } finally {

   if (input != null) {

    try {

     input.close();

    } catch (IOException e) {

     e.printStackTrace();

    }

   }

  }



  return new JobParametersBuilder()

    .addString("commit.interval", commitInterval)  //number of rows processed before commit

    .addDate("sysdate", new Date())      //variable parameter to allow repeated processing

    .toJobParameters();

 }

}


Create a processor class as like below


import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.batch.item.ItemProcessor;

import org.springframework.beans.factory.annotation.Autowired;

import com.dto.Dto;

import com.dto.WriterDto;

import com.service.Service;

public class SchedulerProcessor implements ItemProcessor<Dto, WriterDto> {

 private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerProcessor.class);

 @Autowired

 private Service service;


 @Override

 public WriterDto process(Dto dto) throws Exception {

  LOGGER.debug("Processing  scheduler " + dto.getId()

    ");



  WriterDto writerDto = null;

  try {

   writerDto = service.enableOrDisable(dto);

  } catch (Exception ex) {

   LOGGER.error("SchedulerProcessor error", ex);

   throw ex;

  }

  return writerDto;

 }

}




import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.batch.item.ItemProcessor;

import org.springframework.beans.factory.annotation.Autowired;

import com.dto.Dto;

import com.dto.WriterDto;

import com.service.Service;

public class Processor implements ItemProcessor<Dto, Dto> {

 private static final Logger LOGGER = LoggerFactory.getLogger(Processor.class);

 @Autowired

 private Service service;


 @Override

 public Dto process(Dto dto) throws Exception {

  LOGGER.debug("Processing  update from TIM " + dto.getId()

    ");


  try {

   return service.updateOrigValuesFromTim(dto);

  } catch (Exception ex) {

   LOGGER.error("Processor error", ex);

   throw ex;

  }

 }

}

Create a validator processor class as like below

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.batch.item.ItemProcessor;

import org.springframework.beans.factory.annotation.Autowired;

import com.dto.Dto;

import com.dto.WriterDto;

import com.service.Service;

public class ValidatorProcessor implements ItemProcessor<Dto, WriterDto> {

 private static final Logger LOGGER = LoggerFactory.getLogger(ValidatorProcessor.class);


 @Autowired

 private Service service;


 @Override

 public WriterDto process(Dto dto) throws Exception {

  LOGGER.debug("Processing  scheduler " + dto.getId()

    ");



  WriterDto writerDto = null;

  try {

   writerDto = service.validateValue(dto);

  } catch (Exception ex) {

   LOGGER.error("ValidatorProcessor error", ex);

   throw ex;

  }

  return writerDto;

 }

}

Create a reader class as like below

import java.util.List;

import org.springframework.batch.item.ItemReader;

import org.springframework.batch.item.NonTransientResourceException;

import org.springframework.batch.item.ParseException;

import org.springframework.batch.item.UnexpectedInputException;

import com.dto.Dto;

import com.service.Service;

public class SchedulerReader implements ItemReader<Dto> {

 List<Dto> items;


 private Service Service;


 public void init() throws Exception {

  items = Service.getsForScheduler();

 }


 @Override

 public Dto read()

   throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {

        if (!items.isEmpty()) {

            return items.remove(0);

        }

        return null;

 }

 public Service getService() {

  return Service;

 }

 public void setService(Service Service) {

  this.Service = Service;

 }

}


No comments:

Post a Comment