Source 组件介绍

         Source 主要为数据源提供简易的操作接口,使底层的具体数据源(传统数据库、文件系统、内存数据库、Memcached/Redis缓存)对上层是透明的。其提供两种类型的数据源:DataSource 和 CacheSource。DataSource 为数据库或内存数据库,提供类似JPA、Hibernate的接口与功能。CacheSource 为缓存数据提供类似Memcached、Redis的接口和功能。两者也提供了异步接口(基于远程模式Service)。

DataSource 入门

        JPA虽已提供了简洁成熟的数据库操作接口,但当数据、业务量很庞大的时候就显得捉襟见肘,与JPA相比,DataSource有以下几个特点:
                 1、简易的过滤查询接口,但仅支持简单的表关联查询。
                 2、简化分表分库操作。
                 3、动态更新变更后的数据库连接参数。
                 4、读写分离的简易配置。
                 5、进程间的缓存同步。
                 6、提供异步接口。
        数据库操作方面常见的是过滤查询操作,JPA规范中的JPQL虽然简化了SQL,但是对于动态产生的过滤条件,开发人员还是无法免去组装过滤条件的过程(无论JPQL还是CriteriaQuery), DataSource定义了FilterBean接口可以省略组装条件的过程,FilterNode提供了类似CriteriaQuery的功能,且这两种对象都可以序列化,给远程模式Service提供了基础,微服务架构提倡服务之间尽量降低耦合,因此DataSource仅支持简单的关联查询,复杂的表关联查询或统计应放在数据分析系统中。一个服务通常部署多个进程,若用JPA的缓存则进程之间的缓存无法同步,而DataSource采用SNCP协议即可方便地达到自动同步缓存功能。JPA无法在主数据库异常时动态切换到备份数据库,DataSource会监听persistence.xml文件,当文件配置发生变化时自动切换新的数据库配置。
        为了降低学习成本,DataSource重用了JPA里的部分注解与配置文件,使用方法基本相同,与JPA用法的区别是注解只能标记于字段,不能标记在方法上。

注解类名功能描述
javax.persistence.Cacheable标记Entity类是否需要缓存,与JPA用法一致
javax.persistence.Column标记字段,只使用其name()、insertable()、updatable()属性
javax.persistence.EntityJPA的Entity类必须标记为@Entity, 而Redkale不强制要求,该注解一般较少使用
javax.persistence.GeneratedValue仅用于标记主键是否为数据库自增长
javax.persistence.Id标记主键字段,与JPA用法一致
javax.persistence.Table标记表的别名,与JPA用法一致
javax.persistence.Transient标记是否为表对应的字段,与JPA用法一致
以下是Redkale自定义的注解
@VirtualEntity用于非数据库表对应的Entity类,且仅用于开启缓存模式的DataSource
@DistributeTable标记表进行分表分库存储, 与DistributeTableStrategy接口结合使用
@FilterColumn用于FilterBean过滤类的字段设置
@FilterJoinColumn用于FilterBean过滤类的关联表字段设置
@FilterGroup用于FilterBean过滤类的过滤条件分组设置

        操作数据源主要使用的对象有 DataSource、FilterBean、FilterNode。DataSource 提供的接口分几种系列:

系列方法功能描述
insert插入数据
delete删除数据
update更新数据
updateColumn更新数据的部分字段
getNumberXXX统计查询,用于查询字段的总和、最大值、平均值等数据
queryColumnXXX单个字段数据查询和字段的统计查询
find查找单个对象
queryList查询对象的List集合
querySheet查询对象的Sheet页式集合
directXXX直接运行SQL语句,用于复杂的关联查询与更新

        以上接口除了directXXX,其他都有等同的异步接口。insert、delete、update接口与JPA同名接口用法一样。DataSource提供了丰富的查询接口,且有独特的翻页查询功能。每以系列的方法主要重载三类: 单个字段过滤、FilterBean过滤和FilterNode过滤。
        返回类型为CompletableFuture的接口均为异步接口
        开发者可以借鉴 Redkale-demo 中的 AutoClassCreator的代码根据数据表自动生成Entity代码。

    过滤条件

        FilterBean、FilterNode对象用于构造过滤条件。FilterBean可以转化为FilterNode。FilterBean主要用于接收外界构建的过滤条件,而FilterNode为了构建内部的过滤条件且降低过滤条件变化的耦合性,FilterNode中name值以#开头的视为虚拟字段,不会构建成过滤条件,仅供分布分库的DistributeTableStrategy策略使用。

public class UserBean implements FilterBean {

    private int userid;

    @FilterColumn(express = FilterExpress.LIKE)
    private String username;

    private Range age;

    public UserBean(int userid, String username, Range age) {
        this.userid = userid;
        this.username = username;
        this.age = age;
    }

    /** 以下省略getter setter方法 */

}



 new UserBean(200001, "redkale", new IntRange(14, 36)) 等价于 
 FilterNode.create("userid", 200001).and("username", FilterExpress.LIKE, "redkale").and("age", new Range.IntRange(14, 36))
 
 new UserBean(200001,"redkale",new IntRange(14,36)) 等价于 "WHERE userid=200001 AND username LIKE '%redkale%' AND age BETWEEN 14 AND 36"
 new UserBean(200001, "redkale", null) 等价于 "WHERE userid = 200001 AND username LIKE '%redkale%'"
 new UserBean(0, "redkale", null) 等价于 "WHERE username LIKE '%redkale%'"

                

        如上定义UserBean过滤条件,当非数值类字段值为null、字符串值为空、数值类字段值小于@FilterColumn.least()值(least的默认值为1)都不会构建成过滤条件。@FilterColumn.express根据字段的类型有不同的默认值,若字段类型为Collection子类或数组则express默认为FilterExpress.IN;若字段类型为Range的子类则express默认为FilterExpress.BETWEEN;其他类型则express默认为FilterExpress.EQUAL。默认字段之间是AND关系,若想使用OR关系则需要使用@FilterGroup进行标记:

public class UserBean implements FilterBean {

    private int userid;

    @FilterGroup("[OR]a")
    @FilterColumn(express = FilterExpress.LIKE)
    private String username;

    @FilterGroup("[OR]a")
    private Range age;

    public UserBean(int userid, String username, Range age) {
        this.userid = userid;
        this.username = username;
        this.age = age;
    }

    /** 以下省略getter setter方法 */

}



 new UserBean(200001, "redkale", new IntRange(14, 36)) 等价于 
 FilterNode orNode = FilterNode.create("username" , FilterExpress.LIKE, "redkale").or("age", new Range.IntRange(14, 36)); 
 FilterNode node = FilterNode.create("userid", 200001).and(orNode);
 
 new UserBean(200001,"redkale",new IntRange(14,36)) 等价于 "WHERE userid=200001 AND (username LIKE '%redkale%' OR age BETWEEN 14 AND 36)"
 new UserBean(200001, "redkale", null) 等价于 "WHERE userid = 200001 AND username LIKE '%redkale%'"
 new UserBean(0, "redkale", null) 等价于 "WHERE username LIKE '%redkale%'"


 source.getNumberResult(User.class, FilterFunc.COUNT, null, new UserBean(0, "redkale", new IntRange(14, 36))).intValue() 等价于
 "SELECT COUNT(*) FROM user WHERE username LIKE '%redkale%' AND  age BETWEEN 14 AND 36"
                

        如上@FilterGroup 的value 必须是[OR]或者[AND]开头,没有标记@FilterGroup的字段等价于标记了@FilterGroup(value = "[AND]")。[AND]、[OR]后面的字符串为GROUP_NAME,默认的GROUP_NAME为空字符串。如上"[OR]a"可以直接使用"[OR]",有多个[OR]或者[AND]则需要加上不同的NAME。

    分表分库

        DataSource提供了单个实体类对应多个数据库表的功能,通常流水型的数据量比较大,单个数据库无法存储,DataSource提供了简单的分表操作,同时在接口设计上尽量减少单表操作与分表操作的差异。分表分库只需在实体类上注解@DistributeTable并实现DistributeTableStrategy分表策略即可。

public interface DistributeTableStrategy<T> {

    /**
     * 获取对象的表名
     * 查询单个对象时调用本方法获取表名
     *
     * @param table   模板表的表名
     * @param primary 记录主键
     *
     * @return
     */
    public String getTable(String table, Serializable primary);

    /**
     * 获取对象的表名
     * 查询、修改、删除对象时调用本方法获取表名
     * 注意: 需保证FilterNode过滤的结果集合必须在一个数据库表中
     *
     * @param table 模板表的表名
     * @param node  过滤条件
     *
     * @return
     */
    public String getTable(String table, FilterNode node);

    /**
     * 获取对象的表名
     * 新增对象或更新单个对象时调用本方法获取表名
     *
     * @param table 模板表的表名
     * @param bean  实体对象
     *
     * @return
     */
    public String getTable(String table, T bean);
}
                

        DistributeTableStrategy分表策略需要实现三个接口,模板表由实体类的@Table注解提供。Redkale默认实现的MySQL数据库的拷贝表结构语句,其他数据库类型需要通过指定persistence.xml 中的 javax.persistence.tablenotexist.sqlstatesjavax.persistence.tablecopy.sqltemplate 来配置。

@DistributeTable(strategy = LoginRecord.TableStrategy.class)
public class LoginRecord extends BaseEntity {

    @Id
    @Column(comment = "主键ID; 值=UUID+create36time")
    private String loginid = ""; //主键ID; 值=UUID+create36time

    @Column(updatable = false, comment = "C端用户ID")
    private long userid; //C端用户ID

    @Column(updatable = false, comment = "登录网络类型; wifi/4g/3g")
    private String netmode = ""; //登录网络类型; wifi/4g/3g

    @Column(updatable = false, comment = "APP版本信息")
    private String appversion = ""; //APP版本信息

    @Column(updatable = false, comment = "APP操作系统信息")
    private String appos = ""; //APP操作系统信息

    @Column(updatable = false, comment = "登录时客户端信息")
    private String loginagent = ""; //登录时客户端信息

    @Column(updatable = false, comment = "登录时的IP")
    private String loginaddr = ""; //登录时的IP

    @Column(updatable = false, comment = "创建时间")
    private long createtime; //创建时间

    /** 以下省略getter setter方法 */
   

    //创建对象
    public static void main(String[] args) throws Throwable {
        LoginRecord record = new LoginRecord();
        long now = System.currentTimeMillis();
        record.setCreatetime(now); //设置创建时间
        String create36time = Long.toString(now, 36); //时间的36进制
        if (create36time.length() < 9) create36time = "0" + create36time; //当前时间值的36进制只可能是8位或9位,不足9位填充0
        record.setLoginid(Utility.uuid() + create36time);  //主键的生成规则
        //....  填充其他字段
        source.insert(record);
    }

    public static class TableStrategy implements DistributeTableStrategy<LoginRecord> {

        private static final String dayformat = "%1$tY%1$tm%1$td";

        private static final String yearformat = "%1$tY";

        //过滤查询时调用本方法
        @Override
        public String getTable(String table, FilterNode node) {
            Serializable day = node.findValue("#day");
            if (day != null) getTable(table, (Integer) day, 0L); //存在#day参数则直接使用day值
            Serializable time = node.findValue("#createtime");  //存在createtime则使用最小时间,且createtime的范围必须在一天内,因为本表以天为单位建表
            return getTable(table, 0, (time == null ? 0L : (time instanceof Range ? ((Range.LongRange) time).getMin() : (Long) time)));
        }

        //创建或单个查询时调用本方法
        @Override
        public String getTable(String table, LoginRecord bean) {
            return getTable(table, 0, bean.getCreatetime());
        }

        //根据主键ID查询单个记录时调用本方法
        @Override
        public String getTable(String table, Serializable primary) {
            String loginid = (String) primary;
            String create36time = loginid.substring(loginid.length() - 9); //固定最后9位为创建时间的36进制值
            return getTable(table, 0, Long.parseLong(create36time, 36));
        }

        private String getTable(String table, int day, long createtime) {
            int pos = table.indexOf('.');
            String year = (day > 0 ? "" + day / 10000 : String.format(yearformat, createtime));
            return "platf_login_" + year + "." + table.substring(pos + 1) + "_" + (day > 0 ? day : String.format(dayformat, createtime));
        }
    }
}
                

         如上范例,用户登陆记录的分表分库策略为一年一个库,一个库中365张表,每天一个表。为了分表策略的三个接口均得到实现,需要对主键ID的生成规则进行一定的设计。常见的场景是查询单个用户的登录列表。上面的范例就无法满足查询单个用户的登录信息需求,而分表策略又只能根据一种规则生成,因此需要按用户维度存在另外一张表中。

@DistributeTable(strategy = LoginUserRecord.TableStrategy.class)
public class LoginUserRecord extends BaseEntity {

    @Id
    @Column(comment = "记录ID; 值=userid+'-'+UUID")
    private String seqid = ""; //记录ID; 值=userid+'-'+UUID

    @Column(updatable = false, comment = "C端用户ID")
    private long userid; //C端用户ID

    @Column(comment = "LoginRecord主键")
    private String loginid = ""; //LoginRecord主键

    @Column(updatable = false, comment = "创建时间")
    private long createtime; //创建时间

    /** 以下省略getter setter方法 */

    public static class TableStrategy implements DistributeTableStrategy<LoginUserRecord> {

        @Override
        public String getTable(String table, LoginUserRecord bean) {
            return getTable(table, bean.getUserid());
        }

        @Override
        public String getTable(String table, FilterNode node) {
            Serializable id = node.findValue("userid");
            if (id != null) return getTable(table, id);
            return getHashTable(table, (Integer) node.findValue("#hash"));
        }

        @Override
        public String getTable(String table, Serializable primary) {
            String id = (String) primary;
            return getHashTable(table, (int) (Long.parseLong(id.substring(0, id.indexOf('-'))) % 100));
        }

        private String getHashTable(String table, int hash) {
            int pos = table.indexOf('.');
            return "platf_login." + table.substring(pos + 1) + "_" + (hash > 9 ? hash : ("0" + hash));
        }

    }
}
                

         如上,表LoginUserRecord只存储用户ID与登录信息ID的关联关系,以用户ID取模100进行hash存储,获取用户登录列表时,先查询LoginUserRecord一页的数据,再根据loginid查询LoginRecord实体。常见的分表策略是时间和主键hash,例如用户信息表采用主键hash分表:

@DistributeTable(strategy = UserDetail.TableStrategy.class)
public class UserDetail extends BaseEntity {

    @Id
    private long userid; //用户ID

    @Column(length = 64, comment = "用户昵称")
    private String username = ""; //用户昵称

    @Column(length = 32, comment = "手机号码")
    private String mobile = ""; //手机号码

    @Column(length = 64, comment = "密码")
    @ConvertColumn(ignore = true, type = ConvertType.ALL)
    private String password = ""; //密码

    @Column(length = 128, comment = "备注")
    private String remark = ""; //备注

    @Column(updatable = false, comment = "创建时间")
    private long createtime; //创建时间

    /** 以下省略getter setter方法 */
    

    public static class TableStrategy implements DistributeTableStrategy<UserDetail> {

        @Override
        public String getTable(String table, UserDetail bean) {
            return getTable(table, bean.getUserid());
        }

        @Override
        public String getTable(String table, FilterNode node) {
            Serializable id = node.findValue("userid");
            if (id != null) return getTable(table, id);
            return getHashTable(table, (Integer) node.findValue("#hash"));
        }

        @Override
        public String getTable(String table, Serializable userid) {
            return getHashTable(table, (int) (((Long) userid) % 100));
        }

        private String getHashTable(String table, int hash) {
            int pos = table.indexOf('.');
            return "platf_user." + table.substring(pos + 1) + "_" + (hash > 9 ? hash : ("0" + hash));
        }

    }
}
                

         如上,用户表以userid取模100进行hash分表,若需要提供根据手机号查询单个用户信息,则需要另外存在一个用户ID对应手机号码的关系表,同样可以以手机号后两位数字为hash存储。

CacheSource 入门

         CacheSource同Memcached类似,像一个带有过期功能地Map容器,存放key-value数据。常见的使用场景就是存放HTTP的Session信息。Redkale把用户会话信息数据当做业务数据处理,而不是接入层的数据。WebSocket的连接态数据也是用CacheSource存储。key为WebSocket的groupid,value为WebSocket服务端节点的IP地址列表。

public class UserService implements Service {

    //用户简单信息缓存
    private final Map<Integer, UserInfo> users = new ConcurrentHashMap<>();

    //使用CacheSource必须要指明泛型
    @Resource(name = "usersessions")
    protected CacheSource<Integer> sessions;

    //登录
    public RetResult<UserInfo> login(LoginBean bean) { //bean.sessionid 在接入层进行赋值
        UserInfo user = null;
        // 登陆逻辑 user = ...
        users.put(user.getUserid(), user);
        sessions.set(600, bean.getSessionid(), user.getUserid()); //session过期时间设置为10分钟
        return new RetResult<>(user);
    }

    //获取当前用户信息
    public UserInfo current(String sessionid) { //给HTTP的BaseServlet用
        Integer userid = sessions.getAndRefresh(sessionid);
        return userid == null ? null : users.get(userid);
    }

    //注销
    public void logout(String sessionid) {
        sessions.remove(sessionid);
    }
}

        以上是个简单的范例,用于用户模块存放sessionid。

persistence.xml 配置说明

<!-- 其配置算是标准的JPA配置文件的缩略版 -->
<persistence>	
    <!-- 系统基本库 -->
    <persistence-unit name="demouser">
        <!-- 为NONE表示不启动缓存,@Cacheable 失效; 非NONE值(通常用ALL)表示开启缓存。 -->
        <shared-cache-mode>NONE</shared-cache-mode>
        <properties>
            <!-- 
                DataSource的实现类,没有设置默认为org.redkale.source.DataJdbcSource的实现,使用常规基于JDBC的数据库驱动一般无需设置
            -->
            <property name="javax.persistence.datasource" value="org.redkale.source.DataJdbcSource"/>
            
            <property name="javax.persistence.jdbc.url" value="jdbc:mysql://127.0.0.1:3306/dbuser?characterEncoding=utf8"/>
            <!-- 
                javax.persistence.jdbc.driver在JPA的值是JDBC驱动,Redkale有所不同,值应该是javax.sql.DataSource的子类。 
                为了兼容用户习惯,Redkale内置常见JDBC驱动到javax.sql.DataSource的映射关系:
                                     org.mariadb.jdbc.Driver  ——————  org.mariadb.jdbc.MySQLDataSource
                                       org.postgresql.Driver  ——————  org.postgresql.ds.PGConnectionPoolDataSource
                                       com.mysql.jdbc.Driver  ——————  com.mysql.jdbc.jdbc2.optional.MysqlConnectionPoolDataSource
                             oracle.jdbc.driver.OracleDriver  ——————  oracle.jdbc.pool.OracleConnectionPoolDataSource
                com.microsoft.sqlserver.jdbc.SQLServerDriver  ——————  com.microsoft.sqlserver.jdbc.SQLServerConnectionPoolDataSource
                因此 com.mysql.jdbc.Driver 会被自动转换成 com.mysql.jdbc.jdbc2.optional.MysqlConnectionPoolDataSource
                并且如果JDBC驱动是以上几个版本,javax.persistence.jdbc.driver属性都可以省略,Redkale会根据javax.persistence.jdbc.url的值来识别驱动
            -->
            <property name="javax.persistence.jdbc.driver" value="com.mysql.jdbc.Driver"/>
            <property name="javax.persistence.jdbc.user" value="root"/>
            <property name="javax.persistence.jdbc.password" value="123456"/>
            
            <!-- 最大连接数,默认值:CPU数*16  -->
            <property name="javax.persistence.connections.limit" value="32"/>
            
            <!--  包含的SQL模板,相当于反向LIKE,不同的JDBC驱动的SQL语句不一样,Redkale内置了MySQL的语句 -->
            <property name="javax.persistence.contain.sqltemplate" value="LOCATE(${keystr}, ${column}) > 0"/>
            <property name="javax.persistence.notcontain.sqltemplate" value="LOCATE(${keystr}, ${column}) = 0"/>
            
            <!--  复制表结构的SQL模板,Redkale内置了MySQL的语句 -->
            <property name="javax.persistence.tablenotexist.sqlstates" value="42000;42S02"/>
            <property name="javax.persistence.tablecopy.sqltemplate" value="CREATE TABLE ${newtable} LIKE ${oldtable}"/>
           
        </properties>
    </persistence-unit>
    <!-- IM消息库 -->
    <persistence-unit name="demoim">
        <shared-cache-mode>NONE</shared-cache-mode>
        <properties>
            <!-- jdbc:mysql://127.0.0.1:3306/dbim?autoReconnect=true&amp;autoReconnectForPools=true&amp;characterEncoding=utf8 -->
            <property name="javax.persistence.jdbc.url" value="jdbc:mysql://127.0.0.1:3306/dbim?characterEncoding=utf8"/>
            <property name="javax.persistence.jdbc.driver" value="com.mysql.jdbc.Driver"/>
            <property name="javax.persistence.jdbc.user" value="root"/>
            <property name="javax.persistence.jdbc.password" value="123456"/>
        </properties>
    </persistence-unit>
</persistence>