Source 组件介绍

         Source 主要为数据源提供简易的操作接口,使底层的具体数据源(传统数据库、文件系统、内存数据库、Memcached/Redis缓存)对上层是透明的。其提供两种类型的数据源:DataSource 和 CacheSource。DataSource 为数据库或内存数据库,提供类似JPA、Hibernate的接口与功能。CacheSource 为缓存数据提供类似Memcached、Redis的接口和功能。两者也提供了异步接口(基于远程模式Service)。

DataSource 入门

        JPA虽已提供了简洁成熟的数据库操作接口,但当数据、业务量很庞大的时候就显得捉襟见肘,与JPA相比,DataSource有以下几个特点:
                 1、简易的过滤查询接口,但仅支持简单的表关联查询。
                 2、简化分表分库操作。
                 3、通过watch组件动态更改数据库连接参数。
                 4、读写分离的简易配置。
                 5、提供异步接口。
        数据库操作方面常见的是过滤查询操作,JPA规范中的JPQL虽然简化了SQL,但是对于动态产生的过滤条件,开发人员还是无法免去组装过滤条件的过程(无论JPQL还是CriteriaQuery), DataSource定义了FilterBean接口可以省略组装条件的过程,FilterNode提供了类似CriteriaQuery的功能,且这两种对象都可以序列化,给远程模式Service提供了基础,微服务架构提倡服务之间尽量降低耦合,因此DataSource仅支持简单的关联查询,复杂的表关联查询或统计应放在数据分析系统中。一个服务通常部署多个进程,若用JPA的缓存则进程之间的缓存无法同步,而DataSource采用SNCP协议即可方便地达到自动同步缓存功能。JPA无法在主数据库异常时动态切换到备份数据库,source.properties文件更改后已运行中的进程不会自动切换,需要开启watch组件通过watch动态更改正在运行进程中的配置。
        为了降低学习成本,DataSource重用了JPA里的部分注解与配置文件,使用方法基本相同,与JPA用法的区别是注解只能标记于字段,不能标记在方法上。

注解类名功能描述
org.redkale.persistence.Cacheable标记Entity类是否需要缓存,与JPA用法一致
org.redkale.persistence.Column标记字段,只使用其name()、insertable()、updatable()属性
org.redkale.persistence.EntityJPA的Entity类必须标记为@Entity, 而Redkale不强制要求,该注解一般较少使用
org.redkale.persistence.Id标记主键字段,与JPA用法一致
org.redkale.persistence.Table标记表的别名,与JPA用法一致
org.redkale.persistence.Transient标记是否为表对应的字段,与JPA用法一致
以下是Redkale自定义的注解
@VirtualEntity用于非数据库表对应的Entity类,且仅用于开启缓存模式的DataSource
@DistributeTable标记表进行分表分库存储, 与DistributeTableStrategy接口结合使用
@FilterColumn用于FilterBean过滤类的字段设置
@FilterJoinColumn用于FilterBean过滤类的关联表字段设置
@FilterGroup用于FilterBean过滤类的过滤条件分组设置

        操作数据源主要使用的对象有 DataSource、FilterBean、FilterNode。DataSource 提供的接口分几种系列:

系列方法功能描述
insert插入数据
delete删除数据
update更新数据
updateColumn更新数据的部分字段
getNumberXXX统计查询,用于查询字段的总和、最大值、平均值等数据
queryColumnXXX单个字段数据查询和字段的统计查询
find查找单个对象
queryList查询对象的List集合
querySheet查询对象的Sheet页式集合
directXXX直接运行SQL语句,用于复杂的关联查询与更新(仅限DataSqlSource)

        以上接口除了directXXX,其他都有等同的异步接口。insert、delete、update接口与JPA同名接口用法一样。DataSource提供了丰富的查询接口,且有独特的翻页查询功能。每以系列的方法主要重载三类: 单个字段过滤、FilterBean过滤和FilterNode过滤。
        返回类型为CompletableFuture的接口均为异步接口
        开发者可以借鉴 Redkale-demo 中的 AutoClassCreator的代码根据数据表自动生成Entity代码。

    过滤条件

        FilterBean、FilterNode对象用于构造过滤条件。FilterBean可以转化为FilterNode。FilterBean主要用于接收外界构建的过滤条件,而FilterNode为了构建内部的过滤条件且降低过滤条件变化的耦合性,FilterNode中name值以#开头的视为虚拟字段,不会构建成过滤条件,仅供分布分库的DistributeTableStrategy策略使用。

public class UserBean implements FilterBean {

    private int userid;

    @FilterColumn(express = FilterExpress.LIKE)
    private String userName;

    private Range age;

    public UserBean(int userid, String userName, Range age) {
        this.userid = userid;
        this.userName = userName;
        this.age = age;
    }

    /** 以下省略getter setter方法 */

}



 new UserBean(200001, "redkale", new IntRange(14, 36)) 等价于 
 FilterNodes.eq("userid", 200001).like("userName", "redkale").between("age", new Range.IntRange(14, 36))
 
 new UserBean(200001,"redkale",new IntRange(14,36)) 等价于 "WHERE userid=200001 AND userName LIKE '%redkale%' AND age BETWEEN 14 AND 36"
 new UserBean(200001, "redkale", null) 等价于 "WHERE userid = 200001 AND userName LIKE '%redkale%'"
 new UserBean(0, "redkale", null) 等价于 "WHERE userName LIKE '%redkale%'"

                

        如上定义UserBean过滤条件,当非数值类字段值为null、字符串值为空、数值类字段值小于@FilterColumn.least()值(least的默认值为1)都不会构建成过滤条件。@FilterColumn.express根据字段的类型有不同的默认值,若字段类型为Collection子类或数组则express默认为FilterExpress.IN;若字段类型为Range的子类则express默认为FilterExpress.BETWEEN;其他类型则express默认为FilterExpress.EQUAL。默认字段之间是AND关系,若想使用OR关系则需要使用@FilterGroup进行标记:

public class UserBean implements FilterBean {

    private int userid;

    @FilterGroup("[OR]a")
    @FilterColumn(express = FilterExpress.LIKE)
    private String userName;

    @FilterGroup("[OR]a")
    private Range age;

    public UserBean(int userid, String userName, Range age) {
        this.userid = userid;
        this.userName = userName;
        this.age = age;
    }

    /** 以下省略getter setter方法 */

}



 new UserBean(200001, "redkale", new IntRange(14, 36)) 等价于 
 FilterNode orNode = FilterNodes.like("userName", "redkale").or("age", new Range.IntRange(14, 36)); 
 FilterNode node = FilterNodes.eq("userid", 200001).and(orNode);
 
 new UserBean(200001,"redkale",new IntRange(14,36)) 等价于 "WHERE userid=200001 AND (userName LIKE '%redkale%' OR age BETWEEN 14 AND 36)"
 new UserBean(200001, "redkale", null) 等价于 "WHERE userid = 200001 AND userName LIKE '%redkale%'"
 new UserBean(0, "redkale", null) 等价于 "WHERE userName LIKE '%redkale%'"


 source.getNumberResult(User.class, FilterFunc.COUNT, null, new UserBean(0, "redkale", new IntRange(14, 36))).intValue() 等价于
 "SELECT COUNT(*) FROM user WHERE userName LIKE '%redkale%' AND  age BETWEEN 14 AND 36"
                

        如上@FilterGroup 的value 必须是[OR]或者[AND]开头,没有标记@FilterGroup的字段等价于标记了@FilterGroup(value = "[AND]")。[AND]、[OR]后面的字符串为GROUP_NAME,默认的GROUP_NAME为空字符串。如上"[OR]a"可以直接使用"[OR]",有多个[OR]或者[AND]则需要加上不同的NAME。

    分表分库

        DataSource提供了单个实体类对应多个数据库表的功能,通常流水型的数据量比较大,单个数据库无法存储,DataSource提供了简单的分表操作,同时在接口设计上尽量减少单表操作与分表操作的差异。分表分库只需在实体类上注解@DistributeTable并实现DistributeTableStrategy分表策略即可。

public interface DistributeTableStrategy<T> {

    /**
     * 获取对象的表名
     * 查询单个对象时调用本方法获取表名
     *
     * @param table   模板表的表名
     * @param primary 记录主键
     *
     * @return
     */
    public String getTable(String table, Serializable primary);

    /**
     * 获取对象的表名
     * 查询、修改、删除对象时调用本方法获取表名
     * 注意: 需保证FilterNode过滤的结果集合必须在一个数据库表中
     *
     * @param table 模板表的表名
     * @param node  过滤条件
     *
     * @return
     */
    public String getTable(String table, FilterNode node);

    /**
     * 获取对象的表名
     * 新增对象或更新单个对象时调用本方法获取表名
     *
     * @param table 模板表的表名
     * @param bean  实体对象
     *
     * @return
     */
    public String getTable(String table, T bean);
}
                

        DistributeTableStrategy分表策略需要实现三个接口,模板表由实体类的@Table注解提供。Redkale默认实现的MySQL数据库的拷贝表结构语句,其他数据库类型需要通过指定source.properties 中的 tablenotexist-sqlstatestablecopy-sqltemplate 来配置。

@DistributeTable(strategy = LoginRecord.TableStrategy.class)
public class LoginRecord extends BaseEntity {

    @Id
    @Column(comment = "主键ID; 值=create36time(9位)+'-'+UUID(32位)")
    private String loginid = ""; //主键ID; 值=create36time(9位)+'-'+UUID(32位)

    @Column(updatable = false, comment = "C端用户ID")
    private long userid; //C端用户ID

    @Column(updatable = false, comment = "登录网络类型; wifi/4g/3g")
    private String netMode = ""; //登录网络类型; wifi/4g/3g

    @Column(updatable = false, comment = "APP版本信息")
    private String appVersion = ""; //APP版本信息

    @Column(updatable = false, comment = "APP操作系统信息")
    private String appos = ""; //APP操作系统信息

    @Column(updatable = false, comment = "登录时客户端信息")
    private String loginAgent = ""; //登录时客户端信息

    @Column(updatable = false, comment = "登录时的IP")
    private String loginAddr = ""; //登录时的IP

    @Column(updatable = false, comment = "创建时间")
    private long createTime; //创建时间

    /** 以下省略getter setter方法 */


    //创建对象
    public static void main(String[] args) throws Throwable {
        LoginRecord record = new LoginRecord();
        long now = System.currentTimeMillis();
        record.setCreateTime(now); //设置创建时间
        record.setLoginid(Utility.format36time(now) + "-" + Utility.uuid());  //主键的生成规则
        //....  填充其他字段
        source.insert(record);
    }


    public static class TableStrategy implements DistributeTableStrategy<LoginRecord> {

        private static final String dayformat = "%1$tY%1$tm%1$td"; //一天一个表

        private static final String yearformat = "%1$tY";  //一年一个库

        //过滤查询时调用本方法
        @Override
        public String getTable(String table, FilterNode node) {
            Serializable day = node.findValue("#day");  //LoginRecord没有day字段,所以前面要加#,表示虚拟字段, 值为yyyyMMdd格式
            if (day != null) getTable(table, (Integer) day, 0L); //存在#day参数则直接使用day值
            Serializable time = node.findValue("createTime");  //存在createTime则使用最小时间,且createTime的范围必须在一天内,因为本表以天为单位建表
            return getTable(table, 0, (time == null ? 0L : (time instanceof Range ? ((Range.LongRange) time).getMin() : (Long) time)));
        }

        //创建或单个查询时调用本方法
        @Override
        public String getTable(String table, LoginRecord bean) {
            return getTable(table, 0, bean.getCreateTime());
        }

        //根据主键ID查询单个记录时调用本方法
        @Override
        public String getTable(String table, Serializable primary) {
            String id = (String) primary;
            return getTable(table, 0, Long.parseLong(id.substring(0, 9), 36));
        }

        private String getTable(String table, int day, long createTime) { //day为0或yyyyMMdd格式数据
            int pos = table.indexOf('.');
            String year = day > 0 ? String.valueOf(day / 10000) : String.format(yearformat, createTime); //没有day取createTime
            return "platf_login_" + year + "." + table.substring(pos + 1) + "_" + (day > 0 ? day : String.format(dayformat, createTime));
        }
    }
}
                

         如上范例,用户登陆记录的分表分库策略为一年一个库,一个库中365张表,每天一个表。为了分表策略的三个接口均得到实现,需要对主键ID的生成规则进行一定的设计。常见的场景是查询单个用户的登录列表。上面的范例就无法满足查询单个用户的登录信息需求,而分表策略又只能根据一种规则生成,因此需要按用户维度存在另外一张表中。

@DistributeTable(strategy = LoginUserRecord.TableStrategy.class)
public class LoginUserRecord extends BaseEntity {

    @Id
    @Column(comment = "记录ID; 值=userid+'-'+UUID")
    private String seqid = ""; //记录ID; 值=userid+'-'+UUID

    @Column(updatable = false, comment = "C端用户ID")
    private long userid; //C端用户ID

    @Column(comment = "LoginRecord主键")
    private String loginid = ""; //LoginRecord主键

    @Column(updatable = false, comment = "创建时间")
    private long createTime; //创建时间

    /** 以下省略getter setter方法 */

    public static class TableStrategy implements DistributeTableStrategy<LoginUserRecord> {

        @Override
        public String getTable(String table, LoginUserRecord bean) {
            return getTable(table, bean.getUserid());
        }

        @Override
        public String getTable(String table, FilterNode node) {
            Serializable id = node.findValue("userid");
            if (id != null) return getTable(table, id);
            return getHashTable(table, (Integer) node.findValue("#hash"));
        }

        @Override
        public String getTable(String table, Serializable primary) {
            String id = (String) primary;
            return getHashTable(table, (int) (Long.parseLong(id.substring(0, id.indexOf('-'))) % 100));
        }

        private String getHashTable(String table, int hash) {
            int pos = table.indexOf('.');
            return "platf_login." + table.substring(pos + 1) + "_" + (hash > 9 ? hash : ("0" + hash));
        }

    }
}
                

         如上,表LoginUserRecord只存储用户ID与登录信息ID的关联关系,以用户ID取模100进行hash存储,获取用户登录列表时,先查询LoginUserRecord一页的数据,再根据loginid查询LoginRecord实体。常见的分表策略是时间和主键hash,例如用户信息表采用主键hash分表:

@DistributeTable(strategy = UserDetail.TableStrategy.class)
public class UserDetail extends BaseEntity {

    @Id
    private long userid; //用户ID

    @Column(length = 64, comment = "用户昵称")
    private String userName = ""; //用户昵称

    @Column(length = 32, comment = "手机号码")
    private String mobile = ""; //手机号码

    @Column(length = 64, comment = "密码")
    @ConvertColumn(ignore = true, type = ConvertType.ALL)
    private String password = ""; //密码

    @Column(length = 128, comment = "备注")
    private String remark = ""; //备注

    @Column(updatable = false, comment = "创建时间")
    private long createTime; //创建时间

    /** 以下省略getter setter方法 */
    

    public static class TableStrategy implements DistributeTableStrategy<UserDetail> {

        @Override
        public String getTable(String table, UserDetail bean) {
            return getTable(table, bean.getUserid());
        }

        @Override
        public String getTable(String table, FilterNode node) {
            Serializable id = node.findValue("userid");
            if (id != null) return getTable(table, id);
            return getHashTable(table, (Integer) node.findValue("#hash"));
        }

        @Override
        public String getTable(String table, Serializable userid) {
            return getHashTable(table, (int) (((Long) userid) % 100));
        }

        private String getHashTable(String table, int hash) {
            int pos = table.indexOf('.');
            return "platf_user." + table.substring(pos + 1) + "_" + (hash > 9 ? hash : ("0" + hash));
        }

    }
}
                

         如上,用户表以userid取模100进行hash分表,若需要提供根据手机号查询单个用户信息,则需要另外存在一个用户ID对应手机号码的关系表,同样可以以手机号后两位数字为hash存储。

CacheSource 入门

         CacheSource同Memcached类似,像一个带有过期功能地Map容器,存放key-value数据。常见的使用场景就是存放HTTP的Session信息。Redkale把用户会话信息数据当做业务数据处理,而不是接入层的数据。WebSocket的连接态数据也是用CacheSource存储。key为WebSocket的groupid,value为WebSocket服务端节点的IP地址列表。

public class UserService implements Service {

    //用户简单信息缓存
    private final Map<Integer, UserInfo> users = new ConcurrentHashMap<>();

    //使用CacheSource必须要指明泛型
    @Resource(name = "usersessions")
    protected CacheSource sessions;

    //登录
    public RetResult<UserInfo> login(LoginBean bean) { //bean.sessionid 在接入层进行赋值
        UserInfo user = null;
        // 登陆逻辑 user = ...
        users.put(user.getUserid(), user);
        sessions.setLong(600, bean.getSessionid(), user.getUserid()); //session过期时间设置为10分钟
        return new RetResult<>(user);
    }

    //获取当前用户信息
    public UserInfo current(String sessionid) { //给HTTP的BaseServlet用
        Long userid = sessions.getexLong(sessionid);
        return userid == null ? null : users.get(userid.intValue());
    }

    //注销
    public void logout(String sessionid) {
        sessions.remove(sessionid);
    }
}

        以上是个简单的范例,用于用户模块存放sessionid。

source.properties 配置说明

# CacheSource   @Resource(name="usersession")
# type可以不用设置,框架会根据url判断使用哪个CacheSource实现类
redkale.cachesource[usersession].type = org.redkalex.cache.redis.RedisCacheSource
# 最大连接数
redkale.cachesource[usersession].maxconns = 16
# 节点地址
redkale.cachesource[usersession].node[0].url = redis://127.0.0.1:6363
# 节点密码
redkale.cachesource[usersession].node[0].password = 12345678
# 节点db
redkale.cachesource[usersession].node[0].db = 0



# DataSource   @Resource(name="platf")
# type可以不用设置,框架会根据url判断使用哪个DataSource实现类,默认值: org.redkale.source.DataJdbcSource
redkale.datasource[platf].type = org.redkale.source.DataJdbcSource
# 是否开启缓存(标记为@Cacheable的Entity类),值目前只支持两种: ALL: 所有开启缓存。 NONE: 关闭所有缓存, 非NONE字样统一视为ALL
redkale.datasource[platf].cachemode = ALL
# 是否自动建表当表不存在的时候, 目前只支持mysql、postgres, 默认为false
redkale.datasource[platf].table-autoddl = false
# 用户
redkale.datasource[platf].user = root
# 密码
redkale.datasource[platf].password = 12345678
# 多个URL用;隔开,如分布式SearchSource需要配多个URL
redkale.datasource[platf].url = jdbc:mysql://127.0.0.1:3306/platf?allowPublicKeyRetrieval=true&amp;rewriteBatchedStatements=true&amp;serverTimezone=UTC&amp;characterEncoding=utf8
# 最大连接数,默认值:CPU数
redkale.datasource[platf].maxconns = 16
# 包含的SQL模板,相当于反向LIKE,不同的JDBC驱动的SQL语句不一样,Redkale内置了MySQL的语句
redkale.datasource[platf].contain-sqltemplate = LOCATE(${keystr}, ${column}) > 0
# 包含的SQL模板,相当于反向LIKE,不同的JDBC驱动的SQL语句不一样,Redkale内置了MySQL的语句
redkale.datasource[platf].notcontain-sqltemplate = LOCATE(${keystr}, ${column}) = 0
# 复制表结构的SQL模板,Redkale内置了MySQL的语句
redkale.datasource[platf].tablenotexist-sqlstates = 42000;42S02
# 复制表结构的SQL模板,Redkale内置了MySQL的语句
redkale.datasource[platf].tablecopy-sqltemplate = CREATE TABLE IF NOT EXISTS ${newtable} LIKE ${oldtable}


# DataSource 读写分离
redkale.datasource[platf].read.url = jdbc:mysql://127.0.0.1:3306/platf_r?allowPublicKeyRetrieval=true&amp;rewriteBatchedStatements=true&amp;serverTimezone=UTC&amp;characterEncoding=utf8
redkale.datasource[platf].read.user = root
redkale.datasource[platf].read.password = 12345678

redkale.datasource[platf].write.url = jdbc:mysql://127.0.0.1:3306/platf_w?allowPublicKeyRetrieval=true&amp;rewriteBatchedStatements=true&amp;serverTimezone=UTC&amp;characterEncoding=utf8
redkale.datasource[platf].write.user = root
redkale.datasource[platf].write.password = 12345678