Source 组件介绍
Source 主要为数据源提供简易的操作接口,使底层的具体数据源(传统数据库、文件系统、内存数据库、Memcached/Redis缓存)对上层是透明的。其提供两种类型的数据源:DataSource 和 CacheSource。DataSource 为数据库或内存数据库,提供类似JPA、Hibernate的接口与功能。CacheSource 为缓存数据提供类似Memcached、Redis的接口和功能。两者也提供了异步接口(基于远程模式Service)。
DataSource 入门
JPA虽已提供了简洁成熟的数据库操作接口,但当数据、业务量很庞大的时候就显得捉襟见肘,与JPA相比,DataSource有以下几个特点:
1、简易的过滤查询接口,但仅支持简单的表关联查询。
2、简化分表分库操作。
3、通过watch组件动态更改数据库连接参数。
4、读写分离的简易配置。
5、提供异步接口。
数据库操作方面常见的是过滤查询操作,JPA规范中的JPQL虽然简化了SQL,但是对于动态产生的过滤条件,开发人员还是无法免去组装过滤条件的过程(无论JPQL还是CriteriaQuery), DataSource定义了FilterBean接口可以省略组装条件的过程,FilterNode提供了类似CriteriaQuery的功能,且这两种对象都可以序列化,给远程模式Service提供了基础,微服务架构提倡服务之间尽量降低耦合,因此DataSource仅支持简单的关联查询,复杂的表关联查询或统计应放在数据分析系统中。一个服务通常部署多个进程,若用JPA的缓存则进程之间的缓存无法同步,而DataSource采用SNCP协议即可方便地达到自动同步缓存功能。JPA无法在主数据库异常时动态切换到备份数据库,source.properties文件更改后已运行中的进程不会自动切换,需要开启watch组件通过watch动态更改正在运行进程中的配置。
为了降低学习成本,DataSource重用了JPA里的部分注解与配置文件,使用方法基本相同,与JPA用法的区别是注解只能标记于字段,不能标记在方法上。
注解类名 | 功能描述 |
---|---|
org.redkale.persistence.Cacheable | 标记Entity类是否需要缓存,与JPA用法一致 |
org.redkale.persistence.Column | 标记字段,只使用其name()、insertable()、updatable()属性 |
org.redkale.persistence.Entity | JPA的Entity类必须标记为@Entity, 而Redkale不强制要求,该注解一般较少使用 |
org.redkale.persistence.Id | 标记主键字段,与JPA用法一致 |
org.redkale.persistence.Table | 标记表的别名,与JPA用法一致 |
org.redkale.persistence.Transient | 标记是否为表对应的字段,与JPA用法一致 |
以下是Redkale自定义的注解 | |
@VirtualEntity | 用于非数据库表对应的Entity类,且仅用于开启缓存模式的DataSource |
@DistributeTable | 标记表进行分表分库存储, 与DistributeTableStrategy接口结合使用 |
@FilterColumn | 用于FilterBean过滤类的字段设置 |
@FilterJoinColumn | 用于FilterBean过滤类的关联表字段设置 |
@FilterGroup | 用于FilterBean过滤类的过滤条件分组设置 |
操作数据源主要使用的对象有 DataSource、FilterBean、FilterNode。DataSource 提供的接口分几种系列:
系列方法 | 功能描述 |
---|---|
insert | 插入数据 |
delete | 删除数据 |
update | 更新数据 |
updateColumn | 更新数据的部分字段 |
getNumberXXX | 统计查询,用于查询字段的总和、最大值、平均值等数据 |
queryColumnXXX | 单个字段数据查询和字段的统计查询 |
find | 查找单个对象 |
queryList | 查询对象的List集合 |
querySheet | 查询对象的Sheet页式集合 |
directXXX | 直接运行SQL语句,用于复杂的关联查询与更新(仅限DataSqlSource) |
以上接口除了directXXX,其他都有等同的异步接口。insert、delete、update接口与JPA同名接口用法一样。DataSource提供了丰富的查询接口,且有独特的翻页查询功能。每以系列的方法主要重载三类: 单个字段过滤、FilterBean过滤和FilterNode过滤。
返回类型为CompletableFuture的接口均为异步接口。
开发者可以借鉴 Redkale-demo 中的 AutoClassCreator的代码根据数据表自动生成Entity代码。
过滤条件
FilterBean、FilterNode对象用于构造过滤条件。FilterBean可以转化为FilterNode。FilterBean主要用于接收外界构建的过滤条件,而FilterNode为了构建内部的过滤条件且降低过滤条件变化的耦合性,FilterNode中name值以#开头的视为虚拟字段,不会构建成过滤条件,仅供分布分库的DistributeTableStrategy策略使用。
public class UserBean implements FilterBean {
private int userid;
@FilterColumn(express = FilterExpress.LIKE)
private String userName;
private Range age;
public UserBean(int userid, String userName, Range age) {
this.userid = userid;
this.userName = userName;
this.age = age;
}
/** 以下省略getter setter方法 */
}
new UserBean(200001, "redkale", new IntRange(14, 36)) 等价于
FilterNodes.eq("userid", 200001).like("userName", "redkale").between("age", new Range.IntRange(14, 36))
new UserBean(200001,"redkale",new IntRange(14,36)) 等价于 "WHERE userid=200001 AND userName LIKE '%redkale%' AND age BETWEEN 14 AND 36"
new UserBean(200001, "redkale", null) 等价于 "WHERE userid = 200001 AND userName LIKE '%redkale%'"
new UserBean(0, "redkale", null) 等价于 "WHERE userName LIKE '%redkale%'"
如上定义UserBean过滤条件,当非数值类字段值为null、字符串值为空、数值类字段值小于@FilterColumn.least()值(least的默认值为1)都不会构建成过滤条件。@FilterColumn.express根据字段的类型有不同的默认值,若字段类型为Collection子类或数组则express默认为FilterExpress.IN;若字段类型为Range的子类则express默认为FilterExpress.BETWEEN;其他类型则express默认为FilterExpress.EQUAL。默认字段之间是AND关系,若想使用OR关系则需要使用@FilterOrs进行标记:
@FilterOrs{"a"}
public class UserBean implements FilterBean {
private int userid;
@FilterGroup("a")
@FilterColumn(express = FilterExpress.LIKE)
private String userName;
@FilterGroup("a")
private Range age;
public UserBean(int userid, String userName, Range age) {
this.userid = userid;
this.userName = userName;
this.age = age;
}
/** 以下省略getter setter方法 */
}
new UserBean(200001, "redkale", new IntRange(14, 36)) 等价于
FilterNode orNode = FilterNodes.like("userName", "redkale").or("age", new Range.IntRange(14, 36));
FilterNode node = FilterNodes.eq("userid", 200001).and(orNode);
new UserBean(200001,"redkale",new IntRange(14,36)) 等价于 "WHERE userid=200001 AND (userName LIKE '%redkale%' OR age BETWEEN 14 AND 36)"
new UserBean(200001, "redkale", null) 等价于 "WHERE userid = 200001 AND userName LIKE '%redkale%'"
new UserBean(0, "redkale", null) 等价于 "WHERE userName LIKE '%redkale%'"
source.getNumberResult(User.class, FilterFunc.COUNT, null, new UserBean(0, "redkale", new IntRange(14, 36))).intValue() 等价于
"SELECT COUNT(*) FROM user WHERE userName LIKE '%redkale%' AND age BETWEEN 14 AND 36"
如上@FilterGroup 的value 用.来构建树结构,没有标记@FilterGroup的字段等价于标记了@FilterGroup(value = "")。
分表分库
DataSource提供了单个实体类对应多个数据库表的功能,通常流水型的数据量比较大,单个数据库无法存储,DataSource提供了简单的分表操作,同时在接口设计上尽量减少单表操作与分表操作的差异。分表分库只需在实体类上注解@DistributeTable并实现DistributeTableStrategy分表策略即可。
public interface DistributeTableStrategy<T> {
/**
* 获取对象的表名
* 查询单个对象时调用本方法获取表名
*
* @param table 模板表的表名
* @param primary 记录主键
*
* @return
*/
public String getTable(String table, Serializable primary);
/**
* 获取对象的表名
* 查询、修改、删除对象时调用本方法获取表名
* 注意: 需保证FilterNode过滤的结果集合必须在一个数据库表中
*
* @param table 模板表的表名
* @param node 过滤条件
*
* @return
*/
public String getTable(String table, FilterNode node);
/**
* 获取对象的表名
* 新增对象或更新单个对象时调用本方法获取表名
*
* @param table 模板表的表名
* @param bean 实体对象
*
* @return
*/
public String getTable(String table, T bean);
}
DistributeTableStrategy分表策略需要实现三个接口,模板表由实体类的@Table注解提供。Redkale默认实现的MySQL数据库的拷贝表结构语句,其他数据库类型需要通过指定source.properties 中的 tablenotexist-sqlstates 与 tablecopy-sqltemplate 来配置。
@DistributeTable(strategy = LoginRecord.TableStrategy.class)
public class LoginRecord extends BaseEntity {
@Id
@Column(comment = "主键ID; 值=create36time(9位)+'-'+UUID(32位)")
private String loginid = ""; //主键ID; 值=create36time(9位)+'-'+UUID(32位)
@Column(updatable = false, comment = "C端用户ID")
private long userid; //C端用户ID
@Column(updatable = false, comment = "登录网络类型; wifi/4g/3g")
private String netMode = ""; //登录网络类型; wifi/4g/3g
@Column(updatable = false, comment = "APP版本信息")
private String appVersion = ""; //APP版本信息
@Column(updatable = false, comment = "APP操作系统信息")
private String appos = ""; //APP操作系统信息
@Column(updatable = false, comment = "登录时客户端信息")
private String loginAgent = ""; //登录时客户端信息
@Column(updatable = false, comment = "登录时的IP")
private String loginAddr = ""; //登录时的IP
@Column(updatable = false, comment = "创建时间")
private long createTime; //创建时间
/** 以下省略getter setter方法 */
//创建对象
public static void main(String[] args) throws Throwable {
LoginRecord record = new LoginRecord();
long now = System.currentTimeMillis();
record.setCreateTime(now); //设置创建时间
record.setLoginid(Utility.format36time(now) + "-" + Utility.uuid()); //主键的生成规则
//.... 填充其他字段
source.insert(record);
}
public static class TableStrategy implements DistributeTableStrategy<LoginRecord> {
private static final String dayformat = "%1$tY%1$tm%1$td"; //一天一个表
private static final String yearformat = "%1$tY"; //一年一个库
//过滤查询时调用本方法
@Override
public String getTable(String table, FilterNode node) {
Serializable day = node.findValue("#day"); //LoginRecord没有day字段,所以前面要加#,表示虚拟字段, 值为yyyyMMdd格式
if (day != null) getTable(table, (Integer) day, 0L); //存在#day参数则直接使用day值
Serializable time = node.findValue("createTime"); //存在createTime则使用最小时间,且createTime的范围必须在一天内,因为本表以天为单位建表
return getTable(table, 0, (time == null ? 0L : (time instanceof Range ? ((Range.LongRange) time).getMin() : (Long) time)));
}
//创建或单个查询时调用本方法
@Override
public String getTable(String table, LoginRecord bean) {
return getTable(table, 0, bean.getCreateTime());
}
//根据主键ID查询单个记录时调用本方法
@Override
public String getTable(String table, Serializable primary) {
String id = (String) primary;
return getTable(table, 0, Long.parseLong(id.substring(0, 9), 36));
}
private String getTable(String table, int day, long createTime) { //day为0或yyyyMMdd格式数据
int pos = table.indexOf('.');
String year = day > 0 ? String.valueOf(day / 10000) : String.format(yearformat, createTime); //没有day取createTime
return "platf_login_" + year + "." + table.substring(pos + 1) + "_" + (day > 0 ? day : String.format(dayformat, createTime));
}
}
}
如上范例,用户登陆记录的分表分库策略为一年一个库,一个库中365张表,每天一个表。为了分表策略的三个接口均得到实现,需要对主键ID的生成规则进行一定的设计。常见的场景是查询单个用户的登录列表。上面的范例就无法满足查询单个用户的登录信息需求,而分表策略又只能根据一种规则生成,因此需要按用户维度存在另外一张表中。
@DistributeTable(strategy = LoginUserRecord.TableStrategy.class)
public class LoginUserRecord extends BaseEntity {
@Id
@Column(comment = "记录ID; 值=userid+'-'+UUID")
private String seqid = ""; //记录ID; 值=userid+'-'+UUID
@Column(updatable = false, comment = "C端用户ID")
private long userid; //C端用户ID
@Column(comment = "LoginRecord主键")
private String loginid = ""; //LoginRecord主键
@Column(updatable = false, comment = "创建时间")
private long createTime; //创建时间
/** 以下省略getter setter方法 */
public static class TableStrategy implements DistributeTableStrategy<LoginUserRecord> {
@Override
public String getTable(String table, LoginUserRecord bean) {
return getTable(table, bean.getUserid());
}
@Override
public String getTable(String table, FilterNode node) {
Serializable id = node.findValue("userid");
if (id != null) return getTable(table, id);
return getHashTable(table, (Integer) node.findValue("#hash"));
}
@Override
public String getTable(String table, Serializable primary) {
String id = (String) primary;
return getHashTable(table, (int) (Long.parseLong(id.substring(0, id.indexOf('-'))) % 100));
}
private String getHashTable(String table, int hash) {
int pos = table.indexOf('.');
return "platf_login." + table.substring(pos + 1) + "_" + (hash > 9 ? hash : ("0" + hash));
}
}
}
如上,表LoginUserRecord只存储用户ID与登录信息ID的关联关系,以用户ID取模100进行hash存储,获取用户登录列表时,先查询LoginUserRecord一页的数据,再根据loginid查询LoginRecord实体。常见的分表策略是时间和主键hash,例如用户信息表采用主键hash分表:
@DistributeTable(strategy = UserDetail.TableStrategy.class)
public class UserDetail extends BaseEntity {
@Id
private long userid; //用户ID
@Column(length = 64, comment = "用户昵称")
private String userName = ""; //用户昵称
@Column(length = 32, comment = "手机号码")
private String mobile = ""; //手机号码
@Column(length = 64, comment = "密码")
@ConvertColumn(ignore = true, type = ConvertType.ALL)
private String password = ""; //密码
@Column(length = 128, comment = "备注")
private String remark = ""; //备注
@Column(updatable = false, comment = "创建时间")
private long createTime; //创建时间
/** 以下省略getter setter方法 */
public static class TableStrategy implements DistributeTableStrategy<UserDetail> {
@Override
public String getTable(String table, UserDetail bean) {
return getTable(table, bean.getUserid());
}
@Override
public String getTable(String table, FilterNode node) {
Serializable id = node.findValue("userid");
if (id != null) return getTable(table, id);
return getHashTable(table, (Integer) node.findValue("#hash"));
}
@Override
public String getTable(String table, Serializable userid) {
return getHashTable(table, (int) (((Long) userid) % 100));
}
private String getHashTable(String table, int hash) {
int pos = table.indexOf('.');
return "platf_user." + table.substring(pos + 1) + "_" + (hash > 9 ? hash : ("0" + hash));
}
}
}
如上,用户表以userid取模100进行hash分表,若需要提供根据手机号查询单个用户信息,则需要另外存在一个用户ID对应手机号码的关系表,同样可以以手机号后两位数字为hash存储。
CacheSource 入门
CacheSource同Memcached类似,像一个带有过期功能地Map容器,存放key-value数据。常见的使用场景就是存放HTTP的Session信息。Redkale把用户会话信息数据当做业务数据处理,而不是接入层的数据。WebSocket的连接态数据也是用CacheSource存储。key为WebSocket的groupid,value为WebSocket服务端节点的IP地址列表。
public class UserService implements Service {
//用户简单信息缓存
private final Map<Integer, UserInfo> users = new ConcurrentHashMap<>();
//使用CacheSource必须要指明泛型
@Resource(name = "usersessions")
protected CacheSource sessions;
//登录
public RetResult<UserInfo> login(LoginBean bean) { //bean.sessionid 在接入层进行赋值
UserInfo user = null;
// 登陆逻辑 user = ...
users.put(user.getUserid(), user);
sessions.setLong(600, bean.getSessionid(), user.getUserid()); //session过期时间设置为10分钟
return new RetResult<>(user);
}
//获取当前用户信息
public UserInfo current(String sessionid) { //给HTTP的BaseServlet用
Long userid = sessions.getexLong(sessionid);
return userid == null ? null : users.get(userid.intValue());
}
//注销
public void logout(String sessionid) {
sessions.remove(sessionid);
}
}
以上是个简单的范例,用于用户模块存放sessionid。
source.properties 配置说明
# CacheSource @Resource(name="usersession")
# type可以不用设置,框架会根据url判断使用哪个CacheSource实现类
redkale.cachesource[usersession].type = org.redkalex.cache.redis.RedisCacheSource
# 最大连接数
redkale.cachesource[usersession].maxconns = 16
# 节点地址
redkale.cachesource[usersession].node[0].url = redis://127.0.0.1:6363
# 节点密码
redkale.cachesource[usersession].node[0].password = 12345678
# 节点db
redkale.cachesource[usersession].node[0].db = 0
# DataSource @Resource(name="platf")
# type可以不用设置,框架会根据url判断使用哪个DataSource实现类,默认值: org.redkale.source.DataJdbcSource
redkale.datasource[platf].type = org.redkale.source.DataJdbcSource
# 是否开启缓存(标记为@Cacheable的Entity类),值目前只支持两种: ALL: 所有开启缓存。 NONE: 关闭所有缓存, 非NONE字样统一视为ALL
redkale.datasource[platf].cachemode = ALL
# 是否自动建表当表不存在的时候, 目前只支持mysql、postgres, 默认为false
redkale.datasource[platf].table-autoddl = false
# 用户
redkale.datasource[platf].user = root
# 密码
redkale.datasource[platf].password = 12345678
# 多个URL用;隔开,如分布式SearchSource需要配多个URL
redkale.datasource[platf].url = jdbc:mysql://127.0.0.1:3306/platf?allowPublicKeyRetrieval=true&rewriteBatchedStatements=true&serverTimezone=UTC&characterEncoding=utf8
# 最大连接数,默认值:CPU数
redkale.datasource[platf].maxconns = 16
# 包含的SQL模板,相当于反向LIKE,不同的JDBC驱动的SQL语句不一样,Redkale内置了MySQL的语句
redkale.datasource[platf].contain-sqltemplate = LOCATE(${keystr}, ${column}) > 0
# 包含的SQL模板,相当于反向LIKE,不同的JDBC驱动的SQL语句不一样,Redkale内置了MySQL的语句
redkale.datasource[platf].notcontain-sqltemplate = LOCATE(${keystr}, ${column}) = 0
# 复制表结构的SQL模板,Redkale内置了MySQL的语句
redkale.datasource[platf].tablenotexist-sqlstates = 42000;42S02
# 复制表结构的SQL模板,Redkale内置了MySQL的语句
redkale.datasource[platf].tablecopy-sqltemplate = CREATE TABLE IF NOT EXISTS ${newtable} LIKE ${oldtable}
# DataSource 读写分离
redkale.datasource[platf].read.url = jdbc:mysql://127.0.0.1:3306/platf_r?allowPublicKeyRetrieval=true&rewriteBatchedStatements=true&serverTimezone=UTC&characterEncoding=utf8
redkale.datasource[platf].read.user = root
redkale.datasource[platf].read.password = 12345678
redkale.datasource[platf].write.url = jdbc:mysql://127.0.0.1:3306/platf_w?allowPublicKeyRetrieval=true&rewriteBatchedStatements=true&serverTimezone=UTC&characterEncoding=utf8
redkale.datasource[platf].write.user = root
redkale.datasource[platf].write.password = 12345678