扫二维码与项目经理沟通
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流
sharding中怎么执行jdbc,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。
创新互联服务项目包括肃宁网站建设、肃宁网站制作、肃宁网页制作以及肃宁网络营销策划等。多年来,我们专注于互联网行业,利用自身积累的技术优势、行业经验、深度合作伙伴关系等,向广大中小型企业、政府机构等提供互联网行业的解决方案,肃宁网站推广取得了明显的社会效益与经济效益。目前,我们服务的客户以成都为中心已经辐射到肃宁省份的部分城市,未来相信会继续扩大服务区域并继续获得客户的支持与信任!
内存限制模式:使用此模式的前提是,ShardingSphere对一次操作所耗费的数据库连接数量不做限制。如果实际执行的SQL需要对某数据库实例中的200张表做操作,则对每张表创建一个新的数据库连接,并通过多线程的方式并发处理,以达成执行效率最大化。并且在SQL满足条件情况下,优先选择流式归并,以防止出现内存溢出或避免频繁垃圾回收情况
连接限制模式:使用此模式的前提是,ShardingSphere严格控制对一次操作所耗费的数据库连接数量。如果实际执行的SQL需要对某数据库实例中的200张表做操作,那么只会创建唯一的数据库连接,并对其200张表串行处理。如果一次操作中的分片散落在不同的数据库,仍然采用多线程处理对不同库的操作,但每个库的每次操作仍然只创建一个唯一的数据库连接。这样即可以防止对一次请求对数据库连接占用过多所带来的问题。该模式始终选择内存归并
case: 本文主要以SELECT i.* FROM t_order o, t_order_item i WHERE o.order_id = i.order_id and o.order_id = 2 and o.user_id = 2一个简单查询语句,来分析ss大致如何来执行sql,根据分片改写后的sql,应该是demo_ds_slave_0:SELECT * FROM t_order_0 i, t_order_item_0 o WHERE o.order_id = i.order_id and o.order_id = 2 and o.user_id = 2 来执行
1.初始化PreparedStatementExecutor#init,封装Statement执行单元
public final class PreparedStatementExecutor extends AbstractStatementExecutor { @Getter private final boolean returnGeneratedKeys; public PreparedStatementExecutor( final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final boolean returnGeneratedKeys, final ShardingConnection shardingConnection) { super(resultSetType, resultSetConcurrency, resultSetHoldability, shardingConnection); this.returnGeneratedKeys = returnGeneratedKeys; } /** * Initialize executor. * * @param routeResult route result * @throws SQLException SQL exception */ public void init(final SQLRouteResult routeResult) throws SQLException { setSqlStatement(routeResult.getOptimizedStatement().getSQLStatement()); //添加路由单元,即数据源对应的sql单元 getExecuteGroups().addAll(obtainExecuteGroups(routeResult.getRouteUnits())); //缓存statement、参数 cacheStatements(); } private Collection> obtainExecuteGroups(final Collection routeUnits) throws SQLException { //执行封装Statement执行单元 return getSqlExecutePrepareTemplate().getExecuteUnitGroups(routeUnits, new SQLExecutePrepareCallback() { @Override public List getConnections(final ConnectionMode connectionMode, final String dataSourceName, final int connectionSize) throws SQLException { return PreparedStatementExecutor.super.getConnection().getConnections(connectionMode, dataSourceName, connectionSize); } @Override public StatementExecuteUnit createStatementExecuteUnit(final Connection connection, final RouteUnit routeUnit, final ConnectionMode connectionMode) throws SQLException { return new StatementExecuteUnit(routeUnit, createPreparedStatement(connection, routeUnit.getSqlUnit().getSql()), connectionMode); } }); } @SuppressWarnings("MagicConstant") private PreparedStatement createPreparedStatement(final Connection connection, final String sql) throws SQLException { return returnGeneratedKeys ? connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS) : connection.prepareStatement(sql, getResultSetType(), getResultSetConcurrency(), getResultSetHoldability()); } ... ... }
2.执行封装Statement执行单元getSqlExecutePrepareTemplate().getExecuteUnitGroups
@RequiredArgsConstructor public final class SQLExecutePrepareTemplate { private final int maxConnectionsSizePerQuery; /** * Get execute unit groups. * * @param routeUnits route units * @param callback SQL execute prepare callback * @return statement execute unit groups * @throws SQLException SQL exception */ public Collection> getExecuteUnitGroups(final Collection routeUnits, final SQLExecutePrepareCallback callback) throws SQLException { return getSynchronizedExecuteUnitGroups(routeUnits, callback); } private Collection > getSynchronizedExecuteUnitGroups( final Collection routeUnits, final SQLExecutePrepareCallback callback) throws SQLException { //数据源对应sql单元集合,即demo_ds_0:[select i.* from t_order_0 i, t_order_item_0 o where i.order_id = o.order_id and i.order_id = ?] Map > sqlUnitGroups = getSQLUnitGroups(routeUnits); Collection > result = new LinkedList<>(); for (Entry > entry : sqlUnitGroups.entrySet()) { //添加分片执行组 result.addAll(getSQLExecuteGroups(entry.getKey(), entry.getValue(), callback)); } return result; } private Map > getSQLUnitGroups(final Collection routeUnits) { Map > result = new LinkedHashMap<>(routeUnits.size(), 1); for (RouteUnit each : routeUnits) { if (!result.containsKey(each.getDataSourceName())) { result.put(each.getDataSourceName(), new LinkedList ()); } result.get(each.getDataSourceName()).add(each.getSqlUnit()); } return result; } private List > getSQLExecuteGroups( final String dataSourceName, final List sqlUnits, final SQLExecutePrepareCallback callback) throws SQLException { List > result = new LinkedList<>(); //在maxConnectionSizePerQuery允许的范围内,当一个连接需要执行的请求数量大于1时,意味着当前的数据库连接无法持有相应的数据结果集,则必须采用内存归并; //反之,当一个连接需要执行的请求数量等于1时,意味着当前的数据库连接可以持有相应的数据结果集,则可以采用流式归并 //TODO 场景:在不分库只分表的情况下,会存在一个数据源对应多个sql单元的情况 //计算所需要的分区大小 int desiredPartitionSize = Math.max(0 == sqlUnits.size() % maxConnectionsSizePerQuery ? sqlUnits.size() / maxConnectionsSizePerQuery : sqlUnits.size() / maxConnectionsSizePerQuery + 1, 1); //按照分区大小进行分区 //事例: //sqlUnits = [1, 2, 3, 4, 5] //desiredPartitionSize = 2 //则结果为:[[1, 2], [3,4], [5]] List > sqlUnitPartitions = Lists.partition(sqlUnits, desiredPartitionSize); //maxConnectionsSizePerQuery该参数表示一次查询时每个数据库所允许使用的最大连接数 //根据maxConnectionsSizePerQuery来判断使用连接模式 //CONNECTION_STRICTLY 连接限制模式 //MEMORY_STRICTLY 内存限制模式 ConnectionMode connectionMode = maxConnectionsSizePerQuery < sqlUnits.size() ? ConnectionMode.CONNECTION_STRICTLY : ConnectionMode.MEMORY_STRICTLY; //获取分区大小的连接 List
connections = callback.getConnections(connectionMode, dataSourceName, sqlUnitPartitions.size()); int count = 0; //遍历分区,将分区好的sql单元放到指定连接执行 for (List each : sqlUnitPartitions) { result.add(getSQLExecuteGroup(connectionMode, connections.get(count++), dataSourceName, each, callback)); } return result; } private ShardingExecuteGroup getSQLExecuteGroup(final ConnectionMode connectionMode, final Connection connection, final String dataSourceName, final List sqlUnitGroup, final SQLExecutePrepareCallback callback) throws SQLException { List result = new LinkedList<>(); //遍历sql单元 for (SQLUnit each : sqlUnitGroup) { //回调,创建statement执行单元 result.add(callback.createStatementExecuteUnit(connection, new RouteUnit(dataSourceName, each), connectionMode)); } //封装成分片执行组 return new ShardingExecuteGroup<>(result); } }
1.执行查询sql
public final class PreparedStatementExecutor extends AbstractStatementExecutor { ... ... /** * Execute query. * * @return result set list * @throws SQLException SQL exception */ public ListexecuteQuery() throws SQLException { //获取当前是否异常值 final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown(); //创建回调实例 //执行SQLExecuteCallback的execute方法 SQLExecuteCallback executeCallback = new SQLExecuteCallback (getDatabaseType(), isExceptionThrown) { @Override protected QueryResult executeSQL(final RouteUnit routeUnit, final Statement statement, final ConnectionMode connectionMode) throws SQLException { return getQueryResult(statement, connectionMode); } }; return executeCallback(executeCallback); } ... ... protected final List executeCallback(final SQLExecuteCallback executeCallback) throws SQLException { List result = sqlExecuteTemplate.executeGroup((Collection) executeGroups, executeCallback); //执行完后刷新分片元数据,比如创建表、修改表etc. refreshShardingMetaDataIfNeeded(connection.getShardingContext(), sqlStatement); return result; } ... ... }
2.通过线程池分组执行,并回调callback
@RequiredArgsConstructor public abstract class SQLExecuteCallbackimplements ShardingGroupExecuteCallback { //数据库类型 private final DatabaseType databaseType; //是否异常 private final boolean isExceptionThrown; @Override public final Collection execute(final Collection statementExecuteUnits, final boolean isTrunkThread, final Map shardingExecuteDataMap) throws SQLException { Collection result = new LinkedList<>(); //遍历statement执行单元 for (StatementExecuteUnit each : statementExecuteUnits) { //执行添加返回结果T result.add(execute0(each, isTrunkThread, shardingExecuteDataMap)); } return result; } private T execute0(final StatementExecuteUnit statementExecuteUnit, final boolean isTrunkThread, final Map shardingExecuteDataMap) throws SQLException { //设置当前线程是否异常 ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown); //根据url获取数据源元数据 DataSourceMetaData dataSourceMetaData = databaseType.getDataSourceMetaData(statementExecuteUnit.getStatement().getConnection().getMetaData().getURL()); //sql执行钩子 SQLExecutionHook sqlExecutionHook = new SPISQLExecutionHook(); try { sqlExecutionHook.start(statementExecuteUnit.getRouteUnit(), dataSourceMetaData, isTrunkThread, shardingExecuteDataMap); //执行sql T result = executeSQL(statementExecuteUnit.getRouteUnit(), statementExecuteUnit.getStatement(), statementExecuteUnit.getConnectionMode()); sqlExecutionHook.finishSuccess(); return result; } catch (final SQLException ex) { sqlExecutionHook.finishFailure(ex); ExecutorExceptionHandler.handleException(ex); return null; } } protected abstract T executeSQL(RouteUnit routeUnit, Statement statement, ConnectionMode connectionMode) throws SQLException; }
3.执行executeSQL,调用第三步的callback中的executeSQL,封装ResultSet
public final class PreparedStatementExecutor extends AbstractStatementExecutor { ... ... private QueryResult getQueryResult(final Statement statement, final ConnectionMode connectionMode) throws SQLException { PreparedStatement preparedStatement = (PreparedStatement) statement; ResultSet resultSet = preparedStatement.executeQuery(); ShardingRule shardingRule = getConnection().getShardingContext().getShardingRule(); //缓存resultSet getResultSets().add(resultSet); //判断ConnectionMode //如果是MEMORY_STRICTLY,使用流式StreamQueryResult;否则使用内存MemoryQueryResult return ConnectionMode.MEMORY_STRICTLY == connectionMode ? new StreamQueryResult(resultSet, shardingRule) : new MemoryQueryResult(resultSet, shardingRule); } ... ... }
看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注创新互联行业资讯频道,感谢您对创新互联的支持。
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流