SpringBatch简单处理多表批量动态更新

news/2025/2/26 15:11:07

         项目需要处理一堆表,这些表数据量不是很大都有经纬度信息,但是这些表的数据没有流域信息,需要按经纬度信息计算所属流域信息。比较简单的项目,按DeepSeek提示思索完成开发,AI真好用。

       阿里AI个人版本IDEA安装

      IDEA中使用DeepSeek满血版的手把手教程来了!

代码实现

1、controller

/**
 * 批量流域处理任务
 */
@Tag(name = "批量流域处理任务")
@ApiSupport(order = 2)
@RequestMapping("/job")
@RestController
public class SysBatchJobController {

    @Autowired
    JobLauncher jobLauncher;

    @Autowired
    JobOperator jobOperator;

    @Autowired
    @Qualifier("updateWaterCodeJob")
    private Job updateWaterCodeJob;

    // 多线程分页更新数据
    @GetMapping("/asyncJob")
    public void asyncJob() throws Exception {
        JobParameters jobParameters = new JobParametersBuilder().addLong("time",System.currentTimeMillis()).toJobParameters();
        JobExecution run = jobLauncher.run(updateWaterCodeJob, jobParameters);
        run.getId();
    }

}

2、批量处理表

/**
 * 需要批量处理的业务表信息
 */
@Builder
@AllArgsConstructor
@Data
@TableName(value = "ads_t_sys_batch_update_table")
public class SysBatchUpdateTable extends BaseEntity implements Serializable {

    private static final long serialVersionUID = -7367871287146067724L;

    @TableId(type = IdType.ASSIGN_UUID)
    private String pkId;

    /**
     * 需要更新的表名
     **/
    @TableField(value = "table_name")
    private String tableName;

    /**
     * 所需更新表所在数据库ID
     **/
    @TableField(value = "data_source_id")
    private String dataSourceId;

    /**
     * 表对应的主键字段
     **/
    @TableField(value = "key_id")
    private String keyId;

    /**
     * 表对应的流域字段
     **/
    @TableField(value = "water_code_column")
    private String waterCodeColumn;

    /**
     * 表对应的经度字段
     **/
    @TableField(value = "lon_column")
    private String lonColumn;

    /**
     * 表对应的纬度字段
     **/
    @TableField(value = "lat_column")
    private String latColumn;


    public SysBatchUpdateTable() {
        
    }

}

3、Mapper,传递参数比较麻烦,可以考虑将参数动态整合到sql里面构造

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.bigdatacd.panorama.system.domain.po.SysBatchUpdateTable;
import org.apache.ibatis.annotations.Mapper;

import java.util.List;
import java.util.Map;

@Mapper
public interface UpdateTableMapper extends BaseMapper<SysBatchUpdateTable> {

    /**
     * 根据表名分页查询数据
     * @param tableName 表名
     * @return
     */
    List<Map<String, Object>> selectUpdateTableByPage(String tableName);

    /**
     * 更新数据
     * @param tableName 表名
     * @param waterCode 流域编码
     * @param pkId  表主键ID
     */
    void updateWaterCode(String tableName,
                         String waterCode,
                         String pkId);
}


<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
        PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.bigdatacd.panorama.system.mapper.UpdateTableMapper">

    <!-- 动态分页查询通过#和$区别动态构造更新所需参数 -->
    <select id="selectUpdateTableByPage" resultType="java.util.HashMap">
        <!--如果有分页查询就直接使用分页查询sql-->
        SELECT
        ${keyId} as pkId,
        #{keyId} as keyId,
        ${waterCodeColumn} as waterCode,
        #{waterCodeColumn} as waterCodeColumn,
        ${lonColumn} as lon,
        ${latColumn} as lat,
        #{tableName} as tableName
        FROM ${tableName}  a  where ${waterCodeColumn} is null
        ORDER BY ${keyId} <!-- 确保分页顺序 -->
        LIMIT #{_pagesize} OFFSET #{_skiprows}
    </select>

    <!-- 动态更新 -->
    <update id="updateWaterCode">
        UPDATE ${tableName}
        SET ${waterCodeColumn} = #{waterCode}
        WHERE ${keyId} = #{pkId} <!-- 假设主键为id -->
    </update>
</mapper>

4、配置文件

Spring
 batch:
    job:
      enabled: false   #启动时不启动job
    jdbc:
      initialize-schema: always  
  sql:
    init:
      schema-locations: classpath:/org/springframework/batch/core/schema-mysql.sql    

数据源url加个批处理参数rewriteBatchedStatements=true
 url: jdbc:mysql://localhost:3306/xxxx?autoReconnect=true&useUnicode=true&createDatabaseIfNotExist=true&characterEncoding=utf8&&serverTimezone=GMT%2b8&useSSL=false&rewriteBatchedStatements=true

5、主配置类调整(按表分区)

import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.batch.MyBatisPagingItemReader;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.step.skip.AlwaysSkipItemSkipPolicy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.HashMap;
import java.util.Map;

// 1. 主配置类调整(按表分区)
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private SqlSessionFactory sqlSessionFactory;

    /**
     * 主任务
     *
     * @return
     */
    @Bean("updateWaterCodeJob")
    public Job updateWaterCodeJob(
            @Qualifier("masterStep") Step masterStep
    ) {
        return jobBuilderFactory.get("updateWaterCodeJob")
                .start(masterStep)
                .build();
    }

    @Bean("masterStep")
    public Step masterStep(
            @Qualifier("updateBatchTableData") Step updateBatchTableData,
            @Qualifier("multiTablePartitioner") MultiTablePartitioner multiTablePartitioner
    ) {
        return stepBuilderFactory.get("masterStep")
                .partitioner(updateBatchTableData.getName(), multiTablePartitioner) // 分区器按表名分区一个表一个分区
                .step(updateBatchTableData)
                .gridSize(10) // 按表分区了 并发数一般设置为核心数
                .taskExecutor(taskExecutor())
                .listener(new BatchJobListener())
                .build();
    }

    // 线程池配置(核心线程数=表数量)
    @Bean("batchTaskExecutor")
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(10);
        executor.setThreadNamePrefix("table-processor-");
        return executor;
    }

    /**
     * 处理分页数据更新步骤
     * @return
     */
    @Bean("updateBatchTableData")
    public Step updateBatchTableData(
            @Qualifier("dynamicTableReader") MyBatisPagingItemReader<Map<String, Object>> myBatisPagingItemReader,
            @Qualifier("batchUpdateWriter") BatchUpdateWriter batchUpdateWriter,
            @Qualifier("tableProcessor") TableProcessor tableProcessor

    ) {
        return stepBuilderFactory.get("updateBatchTableData")
                .<Map<String, Object>, Map<String, Object>>chunk(100)
                .reader(myBatisPagingItemReader)
                .processor(tableProcessor)
                .writer(batchUpdateWriter)
                .faultTolerant()
                .skipPolicy(new AlwaysSkipItemSkipPolicy())
                .build();
    }


    /**
     * 分页获取需要更新的表数据
     * @return
     */
    @Bean
    @StepScope
    public MyBatisPagingItemReader<Map<String, Object>> dynamicTableReader(
            @Value("#{stepExecutionContext['keyId']}") String keyId, //需要更新的表ID字段
            @Value("#{stepExecutionContext['waterCodeColumn']}") String waterCodeColumn,// 需要更新的流域字段
            @Value("#{stepExecutionContext['lonColumn']}") String lonColumn,// 经度纬度字段
            @Value("#{stepExecutionContext['latColumn']}") String latColumn,// 经度纬度字段
            @Value("#{stepExecutionContext['tableName']}") String tableName // 需要更新的表名
            ) {

        MyBatisPagingItemReader<Map<String, Object>> reader = new MyBatisPagingItemReader<>();
        reader.setSqlSessionFactory(sqlSessionFactory);
        reader.setQueryId("com.bigdatacd.panorama.system.mapper.UpdateTableMapper.selectUpdateTableByPage");
        Map<String,Object> param = new HashMap<>();
        param.put("keyId",keyId);
        param.put("waterCodeColumn",waterCodeColumn);
        param.put("lonColumn",lonColumn);
        param.put("latColumn",latColumn);
        param.put("tableName",tableName);
        reader.setParameterValues(param);
        reader.setPageSize(2000);
        return reader;
    }

}


import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.jdbc.core.namedparam.SqlParameterSource;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Map;

// 批量更新Writer
@Component("batchUpdateWriter")
@StepScope
public class BatchUpdateWriter implements ItemWriter<Map<String, Object>> {

    @Autowired
    private NamedParameterJdbcTemplate jdbcTemplate;

    @Override
    public void write(List<? extends Map<String, Object>> items) {
        // 构造动态sql更新数据
        StringBuilder sb = new StringBuilder();
        sb.append("UPDATE ");
        sb.append((String) items.get(0).get("tableName"));
        sb.append(" SET ");
        sb.append((String) items.get(0).get("waterCodeColumn"));
        sb.append(" = :waterCode");
        sb.append(" WHERE ");
        sb.append((String) items.get(0).get("keyId"));
        sb.append(" = :pkId");

        jdbcTemplate.batchUpdate(sb.toString(), items.stream()
                .map(item -> new MapSqlParameterSource()
                        .addValue("waterCode", item.get("waterCode"))
                        .addValue("tableName", item.get("tableName"))
                        .addValue("waterCodeColumn", item.get("waterCodeColumn"))
                        .addValue("keyId", item.get("keyId"))
                        .addValue("pkId", item.get("pkId"))
                )
                .toArray(SqlParameterSource[]::new));
    }
}

import javax.sql.DataSource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Component
@Slf4j
public class MultiTablePartitioner implements Partitioner {

    private final DataSource dataSource;

    public MultiTablePartitioner(DataSource dataSource) {
        this.dataSource = dataSource;
    }
    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
        String sql = "SELECT key_id as keyId,water_code_column as waterCodeColumn,lon_column as lonColumn,lat_column as latColumn,page_sql as pageSql,table_name as tableName FROM ads_t_sys_batch_update_table where deleted = 0 and data_status = '0'";
        List<Map<String,Object>> tables = jdbcTemplate.queryForList(sql);
        log.info("查询" + sql);
        Map<String, ExecutionContext> partitions = new HashMap<>();
        for (int i = 0; i < tables.size(); i++) {
            ExecutionContext ctx = new ExecutionContext();
            // 将需要传递的参数放到上下文中,用于动态批量更新的sql用
            ctx.putString("keyId", String.valueOf(tables.get(i).get("keyId")));
            ctx.putString("waterCodeColumn", String.valueOf(tables.get(i).get("waterCodeColumn")));
            ctx.putString("lonColumn", String.valueOf(tables.get(i).get("lonColumn")));
            ctx.putString("latColumn", String.valueOf(tables.get(i).get("latColumn")));
            //ctx.putString("pageSql", String.valueOf(tables.get(i).get("pageSql")));
            ctx.putString("tableName", String.valueOf(tables.get(i).get("tableName")));
            partitions.put("partition" + i, ctx);
        }
        return partitions;
    }
}


import com.bigdatacd.panorama.common.utils.GeoJsonUtil;
import lombok.Builder;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;

import java.util.Map;

// 处理数据的经纬度所在流域
@Component("tableProcessor")
@Builder
public class TableProcessor implements ItemProcessor<Map<String, Object>, Map<String, Object>> {

    @Override
    public Map<String, Object> process(Map<String, Object> item) {
        // 处理数据经纬度查找对应的流域
        item.put("waterCode", GeoJsonUtil.getWaterCode(Double.parseDouble(item.get("lon").toString()), Double.parseDouble(item.get("lat").toString())));
        return item;
    }
}


import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;

/**
 * Job 监听
 */
public class BatchJobListener implements JobExecutionListener {

    private long beingTime;
    private long endTime;

    @Override
    public void beforeJob(JobExecution jobExecution) {
        beingTime = System.currentTimeMillis();
        System.out.println(jobExecution.getJobInstance().getJobName() + "  beforeJob...... " + beingTime);
    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        endTime = System.currentTimeMillis();
        System.out.println(jobExecution.getJobInstance().getJobName() + "  afterJob...... " + endTime);
        System.out.println(jobExecution.getJobInstance().getJobName()  + "一共耗耗时:【" + (endTime - beingTime) + "】毫秒");
    }

}


 6、通过经纬度计算流域工具类

import lombok.extern.slf4j.Slf4j;
import org.geotools.feature.FeatureCollection;
import org.geotools.feature.FeatureIterator;
import org.geotools.geojson.feature.FeatureJSON;
import org.geotools.geometry.jts.JTSFactoryFinder;
import org.locationtech.jts.geom.*;
import org.opengis.feature.Feature;
import org.opengis.feature.Property;

import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;


/**
 * @Description: GeoJSON工具类
 * @author: Mr.xulong
 * @date: 2023年01月09日 14:39
 */
@Slf4j
public class GeoJsonUtil {

    /*public static void main(String[] args) {
        try {
            FeatureCollection featureCollection = getFeatureCollection("sichuanliuyu.json");
            double x = 106.955085;
            double y = 32.09546061139062;
            System.out.println(JSON.toJSONString(properties(x,y,featureCollection)));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }*/
    private static String geoJsonFilePath = "sichuanliuyu.json";

    private GeoJsonUtil() {
    }
    /**
     * 获取区域数据集合
     *
     * @return
     */
    public static FeatureCollection getFeatureCollection() {
        // 读取 GeoJson 文件
        InputStream resourceAsStream = GeoJsonUtil.class.getResourceAsStream("/json/" + geoJsonFilePath);
        FeatureJSON featureJSON = new FeatureJSON();
        try {
            return featureJSON.readFeatureCollection(resourceAsStream);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 判断指定区域集合是否包含某个点
     * @param longitude
     * @param latitude
     * @param featureCollection
     * @return
     */
    public static boolean contains(double longitude, double latitude, FeatureCollection featureCollection) {
        FeatureIterator features = featureCollection.features();
        try {
            while (features.hasNext()) {
                Feature next = features.next();
                if (isContains(longitude, latitude, next)) {
                    return true;
                }
            }
        } finally {
            features.close();
        }
        return false;
    }

    /**
     * 判断指定区域集合是否包含某个点,如果包含,则返回所需属性
     * @param longitude
     * @param latitude
     * @param featureCollection
     * @return
     */
    public static Map<String, Object> properties(double longitude, double latitude, FeatureCollection featureCollection) {
        FeatureIterator features = featureCollection.features();
        try {
            while (features.hasNext()) {
                Feature next = features.next();
                boolean contains = isContains(longitude, latitude, next);
                // 如果点在面内则返回所需属性
                if (contains) {
                    HashMap<String, Object> properties = new HashMap<>();
                    properties.put("waterCode", next.getProperty("FID").getValue());
                    properties.put("waterName", next.getProperty("name").getValue());
                    return properties;
                }
            }
        } finally {
            features.close();
        }
        return null;
    }

    /**
     * 判断指定区域集合是否包含某个点,如果包含,则返回所需属性
     * @param longitude
     * @param latitude
     * @return
     */
    public static Map<String, Object> properties(double longitude, double latitude) {
        FeatureCollection featureCollection = getFeatureCollection();
        FeatureIterator features = featureCollection.features();
        try {
            while (features.hasNext()) {
                Feature next = features.next();
                boolean contains = isContains(longitude, latitude, next);
                // 如果点在面内则返回所需属性
                if (contains) {
                    HashMap<String, Object> properties = new HashMap<>();
                    properties.put("waterCode", next.getProperty("FID").getValue());
                    properties.put("waterName", next.getProperty("name").getValue());
                    return properties;
                }
            }
        } finally {
            features.close();
        }
        return null;
    }

    /**
     * 判断指定区域集合是否包含某个点,如果包含,则返回所需属性
     * @param longitude
     * @param latitude
     * @return
     */
    public static String getWaterCode(double longitude, double latitude) {
        FeatureCollection featureCollection = getFeatureCollection();
        FeatureIterator features = featureCollection.features();
        try {
            while (features.hasNext()) {
                Feature next = features.next();
                boolean contains = isContains(longitude, latitude, next);
                // 如果点在面内则返回所需属性
                if (contains) {
                    return String.valueOf(next.getProperty("FID").getValue());
                }
            }
        } finally {
            features.close();
        }
        return null;
    }

    private static boolean isContains(double longitude, double latitude, Feature feature) {
        // 获取边界数据
        Property geometry = feature.getProperty("geometry");
        Object value = geometry.getValue();
        // 创建坐标的point
        GeometryFactory geometryFactory = JTSFactoryFinder.getGeometryFactory();
        Point point = geometryFactory.createPoint(new Coordinate(longitude, latitude));
        boolean contains = false;
        // 判断是单面还是多面
        if (value instanceof MultiPolygon) {
            MultiPolygon multiPolygon = (MultiPolygon) value;
            contains = multiPolygon.contains(point);
        } else if (value instanceof Polygon) {
            Polygon polygon = (Polygon) value;
            contains = polygon.contains(point);
        }
        return contains;
    }
}

7、地图依赖

<geotools.version>27-SNAPSHOT</geotools.version>
<!--地图-->
        <dependency>
            <groupId>org.geotools</groupId>
            <artifactId>gt-shapefile</artifactId>
            <version>${geotools.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.geotools</groupId>
                    <artifactId>gt-main</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.geotools</groupId>
            <artifactId>gt-main</artifactId>
            <version>${geotools.version}</version>
        </dependency>
        <dependency>
            <groupId>org.geotools</groupId>
            <artifactId>gt-geojson</artifactId>
            <version>${geotools.version}</version>
        </dependency>
        <dependency>
            <groupId>org.geotools</groupId>
            <artifactId>gt-swing</artifactId>
            <version>${geotools.version}</version>
        </dependency>


<repositories>
    <repository>
        <id>osgeo</id>
        <name>OSGeo Release Repository</name>
        <url>https://repo.osgeo.org/repository/release/</url>
        <snapshots><enabled>false</enabled></snapshots>
        <releases><enabled>true</enabled></releases>
    </repository>
    <repository>
        <id>osgeo-snapshot</id>
        <name>OSGeo Snapshot Repository</name>
        <url>https://repo.osgeo.org/repository/snapshot/</url>
        <snapshots><enabled>true</enabled></snapshots>
        <releases><enabled>false</enabled></releases>
    </repository>
    </repositories>

参考git项目 springbatch: 这是一个SpringBoot开发的SpringBatch批处理示例,示例主要是将文件30W条数据使用多线程导入到t_cust_temp表,然后又将t_cust_temp表数据使用分片导入到t_cust_info表。下载即可用。


http://www.niftyadmin.cn/n/5868867.html

相关文章

MongoDB 面试题目

一、基础概念 MongoDB 的特点是什么&#xff1f; MongoDB是一种NoSQL数据库&#xff0c;具有以下特点&#xff1a; 文档存储模型 MongoDB 使用 BSON&#xff08;Binary JSON&#xff09; 格式存储数据&#xff0c;数据以文档的形式组织&#xff0c;类似于JSON对象。文档可以包…

一周学会Flask3 Python Web开发-Jinja2模板过滤器使用

锋哥原创的Flask3 Python Web开发 Flask3视频教程&#xff1a; 2025版 Flask3 Python web开发 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili 在Jinja2中&#xff0c;过滤器(filter)是一些可以用来修改和过滤变量值的特殊函数&#xff0c;过滤器和变量用一个竖线 | &a…

Redis存储​⑫​哨兵Sentinel_高可用实现方案

目录 1. 哨兵Sentinel概念 1.1 主从复制的缺点 1.2 人工恢复主节点故障 1.3 哨兵自动恢复主节点故障 2. 重新选举过程 3. 选举原理 3.1 主观下线 3.2 客观下线 3.3 选举出哨兵的 leader 3.4 leader挑选出master 本篇完。 Redis 的主从复制模式下&#xff0c;一旦主节…

OpenCV计算摄影学(2)图像去噪函数denoise_TVL1()

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 原始-对偶算法是用于解决特定类型变分问题&#xff08;即&#xff0c;寻找一个函数以最小化某个泛函&#xff09;的算法。特别地&#xff0c;图像…

Three.js 入门(辅助、位移、父子关系、缩放旋转、响应式布局)

本篇主要学习内容 : 三维坐标系与辅助坐标系物体位移与父子元素物体的缩放与物体的旋转设置响应式画布与全屏控制 点赞 关注 收藏 学会了 本文使用 Three.js 的版本&#xff1a;171 基于 Vue3vite开发调试 1.三维坐标系与辅助坐标系 1.1) 导入three和轨道控制器 // 导入…

AI将会取代生活的方方面面吗?

当然&#xff0c;无法完全取代不代表没有影响。当我们探讨“干什么不会被取代”时&#xff0c;就意味着一部分“取代”正在或已经发生。 从上述种种案例也能看出&#xff0c;AI足以扛下众多行业中最简单、最前端的“低技能”工作&#xff0c;且与此前技术革命解放体力劳动相比&…

AI驱动的自动化留给人类的时间不多了

时间紧迫&#xff01;时间紧迫&#xff01;时间紧迫&#xff01; 关于AI工作流催生的行业任务自动化时间窗口&#xff0c;结合技术成熟度、成本效益、行业特性等维度&#xff0c;可划分为以下阶段&#xff1a; 一、技术渗透阶段&#xff08;2025-2028年&#xff09; 高重复性任…

网络安全复习资料

网络安全复习资料 1.计算机网络安全是指保持网络中的硬件、软件系统正常运行&#xff0c;使他们不因自然和人为的因素而受到破坏、更改和泄露。 2.网络安全&#xff1a;物理安全&#xff0c;软件安全&#xff0c;信息安全&#xff0c;运行安全。 3.安全防范措施&#xff1a…