通过CQRS/事件溯源框架Reveno实现高负载交易事务处理

来源:InfoQ 作者: 邵思华
  

 

在当今世界,事务的处理每时每刻都在发生,其范围包括使用关系型数据库进行订购处理的个体零售网站,乃至每秒进行10多万次处理的实时交易系统。

Reveno是一个全新的无锁事务处理框架,它支持JVM平台,并基于CQRS及事件溯源模式实现。虽然它只是一个简单而强大的工具,但在性能方面也毫不逊色。所有事务都被持久化为只读的日志,并且可以通过按顺序重演事件的方式恢复领域模型的最新状态。所有的运行时操作都是在内存中执行的,从而使其吞吐量可达到每秒种几百万次事务的数量级,平均延迟时间则限制在微秒级。虽然Reveno的功能如此强大,但它仍然是一个通用目的的框架,涵盖了大量不同种类的用例,并提供了丰富的引擎配置选项。举例来说,你可以调整它的持久性配置,在极其随意(为了提高系统吞吐量)与高度受限(对于数据丢失的情况具有较低的容忍度)之间进行选择。

对于不同的用例来说,其需求也是千差万别的。作为一个通用框架,它需要考虑所有不同的可能性。在本文中,我将通过示例为读者展现如何使用Reveno框架开发一个简单的交易系统。

首先让我们来了解一下Reveno如何处理你的领域模型。作为一个基于CQRS的框架,Reveno会将你的领域切分为事务模型与查询模型。对这两个模型的定义没有任何限制,你无需使用任何强制性的注解、基类,甚至不需要实现Serializable接口,只需使用最简单的POJO就可以完成任务。

没有任何一种单一的途径能够应对所有不同的用例,因此应用程序的设计者需要决定如何处理事务模型的回滚操作。Reveno为实现模型对象提供了两种主要方式:可变与不可变模型。这两种方式各有不同的底层处理机制,并且各有其优缺点。在Java中,不可变对象的开销很低,并且无需进行同步操作,这种方式目前非常流行。Reveno能够非常高效地处理不可变对象,但每种使用不可变类型的领域都会产生额外的垃圾对象,Reveno也不例外。因此,如果你能够接受一些额外的GC开销,那么就应当将这种方式作为默认的选择。反之,如果使用可变模型,那么在进行事务运行时,就要为已使用的对象生成额外的序列化快照,而这将影响到你的性能。幸运的是,如果你坚持使用可变模型,并且仍需要保证性能的最大化及GC影响的最小化,那么你可以选择一种额外的可变模型特性,“补偿行为”(Compensating Actions)。简单来说,补偿行为是指在实现常规的事务处理函数时,一并实现的一种手动回滚行为。如果读者想了解这方面的更多细节,请参考Reveno的官方文档页面

现在,我们已经设计好了一些基本规则,那么让我们进行一些实际的编码工作吧。在我们所设计的交易系统中存在大量的帐户,每个帐户可以有零至多个订单。我们必须提供各种维护性操作,例如帐户的创建和订单的处理等等。在实际应用中,这种类型的系统可能需要应对大量的负载,例如每秒种处理10个直至50万个事务,甚至更多。更复杂的是,此类系统对于延迟非常敏感,而频繁出现的负载峰值可能会直接造成财务损失。

安装

如果你正在使用一些流行的构建工具,例如Maven、Gradle、Sbt等等,那么你可以在Maven Central中加入一个Reveno的依赖。目前为止,共有3个可用的库能够选择:

  • reveno-core —— 包括所有Reveno核心功能包,负责引擎的初始化和事务处理等等。
  • reveno-metrics —— 这个库中的包负责从运行中的引擎中收集各种指标,并将这些指标传递给Graphite、Slf4j等工具。
  • reveno-cluster —— 支持在主-从架构的集群中运行Reveno,以提供故障转移的能力。

你可以在Reveno安装页面中找到完整的安装指南与示例。

定义事务模型

让我们首先从领域模型的定义开始我们的开发过程。正如我们之前所说,领域模型将通过简单的POJO进行创建。我个人倾向于使用不可变对象,因为这将大大简化整个工作。它们不仅能够绕开各种并发问题,并且最重要的一点在于,由于不需要保留已访问对象的快照,因此它在Reveno中有非常出色的性能表现。Reveno允许我们直接使用不可变对象(可以说,Reveno也成为了一个帮助我们学习如何在常规的Java应用中处理不可变性的优秀教程)。

让我们首先定义一个表现系统中典型的交易帐户的实体(为了简单起见,我们将实例变量都定义为public,但在实际应用中并不存在这种限制):

public class TradeAccount {
    public final long id;
    public final long balance;
    public final String currency;
    private final LongSet orders;

    public TradeAccount(long id, String currency) {
        this(id, 0, currency, new LongOpenHashSet());
    }

    private TradeAccount(long id, long balance, 
                         String currency, LongSet orders) {
        this.id = id;
        this.balance = balance;
        this.currency = currency;
        this.orders = orders;
    }

    public LongSet orders() {
        return new LongOpenHashSet(orders);
    }
}

正如我们所见,这个类是不可变的。但这种值对象并不具备任何功能,往往因此被人称为“贫血”对象。因此,更好的方式是让TradeAccount类能够实现一些实用的功能,例如处理订单以及进行货币计算:

public class TradeAccount {
    public final long id;
    public final long balance;
    public final String currency;
    private final LongSet orders;

    public TradeAccount(long id, String currency) {
        this(id, 0, currency, new LongOpenHashSet());
    }

    private TradeAccount(long id, long balance, 
                         String currency, LongSet orders) {
        this.id = id;
        this.balance = balance;
        this.currency = currency;
        this.orders = orders;
    }

    public TradeAccount addBalance(long amount) {
        return new TradeAccount(id, balance + amount, currency, orders);
    }

    public TradeAccount addOrder(long orderId) {
        LongSet orders = new LongOpenHashSet(this.orders);
        orders.add(orderId);
        return new TradeAccount(id, balance, currency, orders);
    }

    public TradeAccount removeOrder(long orderId) {
        LongSet orders = new LongOpenHashSet(this.orders);
        orders.remove(orderId);
        return new TradeAccount(id, balance, currency, orders);
    }

    public LongCollection orders() {
        return new LongOpenHashSet(orders);
    }

}

现在,这个类就变得非常实用了。在开始讲述实际的订单处理细节之前,首先要说明一下Reveno如何使用它的事务模型。所有的实体都会保存在某个repository中,任何类型的处理函数都可以访问该repository(我们稍后将对此进行详细地讲解)。这些实体相互之前通过ID进行引用,并通过ID在repository中进行访问。由于内部的性能优化机制,所有的ID都限制为long类型。

Order类的定义也与之类似,为了简便起见,我们将忽略这部分源代码。不过,你可以在GitHub上下载完整的示例代码,并在本文的末尾找到更多的链接。

定义查询模型

我们已经简单地探索了如何创建Reveno中的事务模型。从逻辑上说,查询功能的定义也同样关键。在Reveno中,查询是通过“视图”的定义而创建的,每个视图都表现了事务模型中的某些实体。除了定义视图类之外,你还应当为每种视图类型提供映射器。我们稍后将对细节进行深入讲解。

当一个事务成功地完成之后,Reveno将对所改变的实体进行映射操作,以保证视图的更新发生在命令完成之前。在默认情况下,Reveno中的查询模型是保存在内存中的。让我们为TradingAccount类定义一个视图:

public class TradeAccountView {
    public final double balance;
    public final Set<OrderView> orders;

    public TradeAccountView(double balance, Set<OrderView> orders) {
        this.balance = balance;
        this.orders = orders;
    }
}

TradingAccountView类中还包括其他种类视图(在这个示例中对应着OrderView)的一个集合,在进行查询、序列化、JSON格式转换等操作时,这种方式能够带来很大的便利。Reveno映射器支持多种实用的方法,以简化将ID的集合映射到视图的集合等操作。我们稍后将进行一些实际操作。

定义命令与事务行为

为了在Reveno中执行事务,我们必须首先执行一个“命令”对象。命令对象本身可以是一个简单的POJO,它需要在系统中注册一个特定的处理函数。通常来说,命令将用于执行某些聚合与校验逻辑,以只读方式访问repository。但最重要的是,命令需要履行它的职责,以发送各种“事务行为”(因此命令也被称为“状态转变器”)。

事务行为是用于在领域模型中进行状态改变的组件,它通过对repository的读-写访问以执行。事务行为对象本身可以表现为一个POJO,并在系统中注册对应的处理函数。所有行为组合在一起成为一个单一的原子性事务,它包含在当前所执行命令的范围内。在成功执行完成之后,事务行为将被持久化至底层的存储引擎中,并且在重启或发生任何故障之后重演其状态。

在这个交易系统中,我们需要创建新的交易帐户,并设定初始的余额。与之前的做法一样,我们首先要定义一个事务命令:

public class CreateAccount {
    public final String currency;
    public final double initialBalance;

    public CreateAccount(String currency, double initialBalance) {
        this.currency = currency;
        this.initialBalance = initialBalance;
    }

    public static class CreateAccountAction {
        public final CreateAccount info;
        public final long id;

        public CreateAccountAction(CreateAccount info, long id) {
            this.info = info;
            this.id = id;
        }
    }
}

我们实际上共创建了两个类。CreateAccount作为命令,CreateAccountAction则作为事务行为。通常来说,这种切分方式并非强制性的。如果命令与事务行为数据完全匹配,那么你可以放心地重用同一个类。但在这个示例中,我们所获取的货币数值类型为double(例如来自于某些遗留的终端系统),而在内部引擎中,货币的数值将保存为long,以确保其精确度能够完全匹配。

现在,我们就可以初始化一个Reveno引擎,并定义命令与事务行为的处理函数了:

Reveno reveno = new Engine(pathToEngineFolder);

reveno.domain().command(CreateAccount.class, long.class, (c, ctx) -> {
    long accountId = ctx.id(TradeAccount.class);
    ctx.executeTxAction(new CreateAccount.CreateAccountAction(c, accountId));
    if (c.initialBalance > 0) {
        ctx.executeTxAction(new ChangeBalance(
                            accountId, toLong(c.initialBalance)));
    }
    return accountId;
});

reveno.domain().transactionAction(CreateAccount.CreateAccountAction.class,
                               (a, ctx) -> ctx.repo().store(a.id, 
                                new TradeAccount(a.id, a.info.currency)));

reveno.domain().transactionAction(ChangeBalance.class, 
                               (a, ctx) -> ctx.repo().
                                remap(a.accountId, TradeAccount.class, 
                               (id, e) -> e.addBalance(a.amount))
);

这段代码包括的内容很多,让我们仔细分析一下。首先,我们定义了一个CreateAccount命令处理函数,它负责生成下一个帐户ID,并执行了一个事务命令以进行帐户的创建。如果在定义时传入了初始的余额,则还需执行ChangeBalance这个事务行为。需要指出的是,ctx.executeTxAction这个方法调用不会阻塞。当命令处理函数成功完成之后,所有事务行为都会在一个单一的进程中执行。因此,如果需要在任何一个TxAction处理函数中进行回滚操作,那么这些事务行为所生成的改动都将被顺利回滚(实际的回滚机制是基于事务模型等实现的)。

将实体映射至查询模型

由于事务模型与查询模型是分离的,因此我们需要定义一些映射器,将实体转换为对应的视图表现。不过,我们并不需要在代码中明确地调用这些映射方法,因为Reveno会自动发现repository中的“脏”实体,并调用相应的映射方法。让我们看看TraceAccount是如何映射到TradeAccountView的:

reveno.domain().viewMapper(TradeAccount.class, 
                           TradeAccountView.class, (id,e,r) ->
                            new TradeAccountView(fromLong(e.balance), 
                            r.linkSet(e.orders(), OrderView.class)));

这段代码中的id指代实体的标识符,e指代实体本身,而r指代一个特殊的映射上下文,其中包含各种实用的方法。实际完成映射工作的是r.linkSet(..)方法,它将以延迟的方式将ID指针的集合映射至实际视图的集合。

我们可以通过相同的方式定义Order至OrderView之间的映射:

reveno.domain().viewMapper(Order.class, OrderView.class, (id,e,r) ->
        new OrderView(fromLong(e.price), e.size, e.symbol, 
                      r.get(TradeAccountView.class, e.accountId))); 

正如你所见,我们的查询模型也是通过不可变对象组成的,这一点与事务模型中的实体一样,它将极大地简化映射逻辑。再次强调,虽然这一点并非强制约束,但如若不然,则我们必须自行负责映射逻辑的正确性。

执行命令

Reveno中的事务处理操作默认就是异步的。当你在一个运行中的引擎中执行某个命令时,该方法调用将立即返回一个CompletableFuture对象,最终将在将来某一时刻返回结果。Reveno在内部定义了一个具有多个阶段的“管道”,每个阶段将处理各自的线程。在这些管道中如果选择逐个传递对象会产生很大的消耗,因此在这里就可以使用批处理方式。在高负载情况下,Reveno会在每个阶段处理进行批处理。正因为如此,Reveno在一开始就为系统提供了高吞吐能力。

在完成了所有定义与业务逻辑实现之后,我们可以开始使用这个引擎了。首先我们需要启动它:

reveno.startup();

随后,我们就可以在系统中创建一个新的交易帐户了。你应该留意一点,executeCommand()方法也存在一个同步的版本,它对于测试以及编写示例非常有用:

long accountId = reveno.executeSync(new CreateAccount("USD", 5.15));

在这个示例中,Reveno内部将调用相应的命令以及事务行为处理函数,后者将为你创建一个新的美元帐户,并将初始余额设为5.15美元。我们可以通过以下方式检查它的正确性:

System.out.println(reveno.query().find(TradeAccountView.class, 
                                       accountId).balance);

这段代码将打印出“5.15”。为了让这个示例看起来更有趣,让我们为这个帐户添加一个新的订单:

long orderId = reveno.executeSync(
                        new MakeOrder(accountId, "EUR/USD", 1, 1.213));

我们在这里创建了一个以1.213美元购买1欧元的新订单。随后,我们可以再次检查帐户信息,以了解其中的变化:

System.out.println(reveno.query().find(TradeAccountView.class, 
                                       accountId).orders.size());

这一次所打印的结果是“1”,这表示这个帐户有一个未完成的订单。最后,让我们关闭这个订单,完成这个以美元购买欧元的操作,它将会使帐户中的余额减少1.213,最终余额为3.937美元。

reveno.executeSync(new ExecuteOrder(orderId));
// the balance is expected to be 3.937, after order successfully executed
System.out.println(reveno.query().find(TradeAccountView.class, 
                                       accountId).balance);

持久化

正如我们在介绍部分所说的一样,Reveno首先是一个事务处理框架。你可以在同一个目录下重启你的引擎,而仍然能看到模型的最新状态。我们尽可能使该框架的每个部分都具有可配置性,而一点与持久性同样相关。你可以通过调用reveno.config()方法查看所有的可选项。

发布事件

Reveno本身自带一个事件处理子系统。你的任务就是为它定义自己的事件类(通常来说定义为POJO)以及处理函数,并发布这些事件以处理事务行为。需要指出的重点在于,事件只有在命令执行成功之后才会被发布,并且完全是异步的。所有的视图映射过程都严格地发生于事件处理函数执行之前。

事件的执行结果同样会被持久化到存储引擎中,如果事件成功地完成了,那么在引擎重启之后通常也不会被再次处理。但这种行为并不能得到严格的保障,因此,如果你需要确保处理函数是100%幂等的,则应当检查EventMetadata.isReplay这个标记,在每个事件处理函数中都可以访问它。

让我们再次扩展一下这个示例,让它对于某个交易帐户中的余额变化事件进行发布与处理。首先,我们将定义这个事件,并添加适当的字段:

public class BalanceChangedEvent {
    public final long accountId;

    public BalanceChangedEvent(long accountId) {
        this.accountId = accountId;
    }
}

当某个帐户的余额产生变化时,我们只需要了解帐户的ID,因为我们可以在处理函数中查询相应的视图。我们将通过以下方法定义事件处理函数:

reveno.events().eventHandler(BalanceChangedEvent.class, (e, m) -> {
    TradeAccountView account = reveno.query().find(TradeAccountView.class, 
                                                   e.accountId);
    System.out.println(String.format(
                          "New balance of account %s from event is: %s", 
                        e.accountId, account.balance));
});

相应地,我们也需要在ChangeBalance事务行为处理函数的定义中添加一行代码:

reveno.domain().transactionAction(ChangeBalance.class, (a, ctx) -> {
        ctx.repo().remap(a.accountId, TradeAccount.class, 
         (id, e) -> e.addBalance(a.amount));
    // publish an event to all listeners
        ctx.eventBus().publishEvent(new BalanceChangedEvent(a.accountId));
});

由于ChangeBalance这个事务行为出现在多个命令中,当添加了这段事件发布代码后,我们就会不断收到它的事件。还有一点需要注意,publishEvent调用会立即返回,而事件的发布则是最终某一时刻才会发生的。最终,我们将看到以下输出:

New balance of account 1 from event is: 5.15

New balance of account 1 from event is: 3.937

性能检测

现在,整个示例已经可以运行了,那么让我们来看看这个应用能够处理怎样的负载。Reveno提供了一个非常实用的reveno-metrics库,能够帮助你追踪某个运行中引擎的性能指标。与其他部分一样,这个指标库也经过了优化,包括堆外内存(off-heap memory)的使用以及无锁的代码,因此它对于整体性能所造成的影响非常小。它还支持与某些流行的监控系统进行集成,例如Graphite。

(需要指出的是,reveno-metrics总的来说是一个性能监控工具,而不是一个微基准测试框架。如果要获取准确的基准测试结果,可以考虑使用JMH或类似的工具。)

我们选择的环境MacBook Pro 2.7 GHz i5 CPU,首先要在代码中对指标集合进行初始化,以使用Reveno Slf4j这个sink,随后重复运行ChangeBalance这个命令4千5百万次(包括用于预热的迭代):

  • reveno.instances.MAC-15_local.default.latency.mean: 68804
  • reveno.instances.MAC-15_local.default.latency.min: 775
  • reveno.instances.MAC-15_local.default.latency.max: 522265
  • reveno.instances.MAC-15_local.default.throughput.hits: 1183396

这些数字表示平均延迟时间约69微秒,最小值为775纳秒,而最高值则为522微秒,总吞量是每秒运行1183396次事务。考虑到后台所需完成的各种工作以及持久性级别,这一结果令人印象十分深刻。

结论

Reveno框架目前才刚刚崭露头角,但它的发展十分迅速。你可以访问我们的官方网站以学习更多的相关知识。我们对于任何建议以及反馈都保持开发的态度。你也可以加入我们的Google讨论小组,在Issues页面中提交bug,或是向mailto:support@reveno.org提交非公开问题或其他任何问题。

本文中所描述的Demo的完整代码可以在GitHub上找到,你还能够找到使用Reveno的各种示例(或者自行提交一个pull request)。

关于作者

Artem Dmitriev目前在GetIntent这家广告科技创业公司担任软件工程师,他的工作是交付大规模实时投标这方面需求的平台。近几年来,Artem致力于在JVM平台上创建高负载的系统。他的工作背景包括为多市场交易平台开发核心引擎,这些平台通常对于延迟与吞吐量有很高的要求。Artem对于开源软件及其开发充满热情,他热诚欢迎用户的反馈,可以通过art.dm.ser@gmail.com向他发送邮件。

 

查看英文原文High Load Trading Transaction Processing with Reveno CQRS/Event Sourcing Framework  


时间:2016-06-15 22:17 来源:InfoQ 作者: 邵思华 原文链接

好文,顶一下
(0)
0%
文章真差,踩一下
(0)
0%
------分隔线----------------------------


把开源带在你的身边-精美linux小纪念品
无觅相关文章插件,快速提升流量