β

Streaming MySQL Results Using Java 8 Streams

Butterfly 42 阅读

The article is inspired by the posts here and here .

There is a RESTful service as the infrastructure for data access in our team. It is based on Jersey/JAX-RS and runs fast. However, it consumes large memory when constructing large data set as response. Since it builds the entire response in memory before sending it.

As suggested in the above posts. Streaming is the solution. They integrated Hibernate or Spring Data for easy adoption. But I need a general purpose RESTful service, say, I do not know the schema of a table. So I decided to implement it myself using raw JDBC interface.

My class is so-called MysqlStreamTemplate :

So, here is my class. NOTE : closing our Statement & Connection requires explicit invoke of Stream#close() :

import javax.sql.DataSource;
import java.io.Closeable;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class MysqlStreamTemplate {

    private DataSource dataSource;

    public MysqlStreamTemplate(DataSource dataSource) {
        this.dataSource = dataSource;
    }

    public Stream<Map> query(String sql) throws SQLException {
        return new MysqlStreamQuery().stream(sql);
    }

    class MysqlStreamQuery implements Closeable {

        private Connection connection;
        private Statement statement;

        public Stream<Map> stream(String sql) throws SQLException {
            connection = dataSource.getConnection();
            /*
             * MySQL ResultSets are completely retrieved and stored in memory (com.mysql.jdbc.RowDataStatic). Or
             * - Set useCursorFetch=true&defaultFetchSize=nnn in connection string (com.mysql.jdbc.RowDataCursor).
             * - Set resultSetType/resultSetConcurrency and fetchSize (Integer.MIN_VALUE) when creating statements (com.mysql.jdbc.RowDataDynamic).
             * See: https://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-implementation-notes.html
             */
            /*
             * MySQL documents say nothing about cursor holdability, so not use it explicitly.
             */
            statement = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
            statement.setFetchSize(Integer.MIN_VALUE);
            /* begin query */
            ResultSet rs = statement.executeQuery(sql);
            int columns = rs.getMetaData().getColumnCount();
            Map resultMap = new HashMap(columns);
            /* NOTE: Manually invoking of Stream.close() is required to close the MySQL statement and connection. */
            Stream<Map> resultStream = StreamSupport.stream(new Spliterators.AbstractSpliterator<Map>(Long.MAX_VALUE, Spliterator.ORDERED | Spliterator.NONNULL | Spliterator.IMMUTABLE) {
                @Override
                public boolean tryAdvance(Consumer<? super Map> action) {
                    try {
                        if (!rs.next()) {
                            return false;
                        }
                        resultMap.clear();
                        for (int i = 1; i <= columns; i++) {
                            resultMap.put(rs.getMetaData().getColumnLabel(i), rs.getObject(i));
                        }
                        action.accept(resultMap);
                        return true;
                    } catch (SQLException e) {
                        throw new RuntimeException(e);
                    }
                }
            }, false).onClose(() -> close());
            return resultStream;
        }

        @Override
        public void close() {
            if (statement != null) {
                try {
                    statement.close();
                } catch (SQLException e) {
                }
                statement = null;
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (SQLException e) {
                }
                connection = null;
            }
        }
    }

}

Read inline comments for additional details. Now the response entry and controller mapping:

import java.util.Map;
import java.util.stream.Stream;

public class ApiStreamResponse extends Response {

    /* requires jackson-datatype-jdk8 2.9.0 */
    private Stream<Map> result;

    public ApiStreamResponse(Stream<Map> result) {
        this.result = result;
    }

    public Stream<Map> getResult() {
        return result;
    }

    public void setResult(Stream<Map> result) {
        this.result = result;
    }

}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import java.util.Map;
import java.util.concurrent.Callable;
import java.util.stream.Stream;

@RequestMapping(path = "/api")
@RestController
public class ApiController {

    private static final Logger logger = LoggerFactory.getLogger(ApiController.class);

    @Autowired
    private MysqlClient mysqlClient;

    @RequestMapping(path = "/v1", method = RequestMethod.GET, produces = MediaType.APPLICATION_JSON_VALUE)
    public Callable<ApiResponse> getV1() {
        return () -> {
            String r = mysqlClient.executeToJson(MysqlClient.SQL).getLeft();
            return new ApiResponse(r);
        };
    }

    @RequestMapping(path = "/v2", method = RequestMethod.GET, produces = MediaType.APPLICATION_JSON_VALUE)
    public Callable<ApiStreamResponse> getV2() {
        return () -> {
            Stream<Map> r = mysqlClient.executeToStream(MysqlClient.SQL);
            return new ApiStreamResponse(r);
        };
    }

}

Complete code can be find on my GitHub repository .

My simple benchmark script looks like:

# ab -c 30 -n 3000 http://localhost:5050/api

Dramatic improvements in memory usage as shown in jconsole, especially Old Gen:
all_memory
old_gen_memory

Some raw data from jmap:

作者:Butterfly
My broken wings still strong enough to cross the ocean with.
原文地址:Streaming MySQL Results Using Java 8 Streams, 感谢原作者分享。

发表评论