Actor是一种高并发处理模型,每个Actor都有着自己的状态有序消息处理机制,所以在业务处理的情况并不需要制定锁的机制,从而达到更高效的处理能性。XRPC
是一个基于远程接口调用的RPC组件,它可以简单地实现高性能的远程接口调用;XRPC在创建远程接口时是支持针对接口创建对应的Actor实例。当创建接口Actor
后,所有Client针对这一例实例Actor的所有方法调用都是有序处理。以下介绍如何在XRPC创建并使用Actor
什么场景适用于Actor
既然每个Actor都有着自己的状态和顺序处理机制,那可以针对这优点进行相关业务的展开;在棋牌游戏中的桌子可以是一个Actor,车票里的每一辆车可以是一个
Actor,秒杀里的每一种商品是一个Actor。在这些Actor
所有的操作都是有序进行,不存在锁,也不需要事务(通过EventSourcing来保障)和不会产生死锁;数据变更全内存操作,通过这样可以大提高业务的处理性能。
引用XRPC
Install-Package BeetleX.XRPC
定义Actor的RPC服务
XRPC支持的Actor服务功能是建立在EventNext之上,它的好处是直接接口行为的Actor
创建,而不是传统的消息加接收方法,这样在应用设计和调用上也是非常方便灵活。接下来定义一个简单的Actor服务
* 接口定义 1 public interface IAmountService 2 { 3 Task<int> Income(int amount);
4 Task<int> Payout(int amount); 5 Task<int> Get(); 6 } 以上是一个简单的帐记变更行为接口
*
实现接口
1 [Service(typeof(IAmountService))] 2 public class AmountService :
ActorState, IAmountService 3 { 4 5 private int mAmount; 6 7 public override
Task ActorInit(string id) 8 { 9 return base.ActorInit(id); 10 } 11 12
public Task<int> Get() 13 { 14 return mAmount.ToTask(); 15 } 16 17 public
Task<int> Income(int amount) 18 { 19 mAmount += amount; 20 return
mAmount.ToTask();21 } 22 23 public Task<int> Payout(int amount) 24 { 25
mAmount -= amount; 26 return mAmount.ToTask(); 27 } 28 }
*
启动对应的RPC服务
private static XRPCServer mXRPCServer; static void Main(string[] args) {
mXRPCServer= new XRPCServer(); mXRPCServer.ServerOptions.LogLevel =
LogType.Error; mXRPCServer.Register(typeof(Program).Assembly);
mXRPCServer.Open(); Console.Read(); }
以上代码是在默认端口9090上绑定RPC服务,可以通过运行日志查看服务启动情况
创建远程Actor调用
* 创建RPC Client 1 client = new XRPCClient("192.168.2.18", 9090); 2
client.Connect(); 以上代码是创建一个RPC客户端,通过它的Create可以创建接口代理
* 创建接口Actor实例 IAmountService henry = client.Create<IAmountService>("henry");
IAmountService ken= client.Create<IAmountService>("ken"); 以上是针对IAmountService
创建了两个Actor对象,这两个对象的操作都是相互隔离互不干扰;每个Actor
对象中的方法在并发下都是有序执行,并不会产生线程安全问题,所以在不同方法中操作对像的数据成员都不需要锁来保证数据安全性。
测试
为了更好地验证Actor的隔离和并发安全性,简单地并发测试一下
1 for (int i = 0; i < concurrent; i++) 2 { 3 var task = Task.Run(async ()
=> 4 { 5 for (int k = 0; k < requests; k++) 6 { 7 await henry.Income(10);
8 System.Threading.Interlocked.Increment(ref mCount); 9 } 10 }); 11
tasks.Add(task);12 task = Task.Run(async () => 13 { 14 for (int k = 0; k <
requests; k++) 15 { 16 await henry.Payout(10); 17
System.Threading.Interlocked.Increment(ref mCount); 18 } 19 }); 20
tasks.Add(task);21 task = Task.Run(async () => 22 { 23 for (int k = 0; k <
requests; k++) 24 { 25 await ken.Income(10); 26
System.Threading.Interlocked.Increment(ref mCount); 27 } 28 }); 29
tasks.Add(task);30 task = Task.Run(async () => 31 { 32 for (int k = 0; k <
requests; k++) 33 { 34 await ken.Payout(10); 35
System.Threading.Interlocked.Increment(ref mCount); 36 } 37 }); 38
tasks.Add(task);39 } 40 await Task.WhenAll(tasks.ToArray()); 41 double useTime
= EventCenter.Watch.ElapsedMilliseconds - start; 42 Console.WriteLine($"
Completed count:{mCount}|use time:{useTime}|rps:{(mCount / useTime *
1000d):###.00} |henry:{await henry.Get()},ken:{await ken.Get()}");
两个程序同时在本机跑了一下,在50并发的情况大概是11万RPS
服务中的Actor隔离性
服务是通过名称来实例化接口的不同Actor,同一服务即使多个Client同时对一名称的Actor进行创建服务也可以保证它的唯一性。
完整示例代码
https://github.com/IKende/XRPC/tree/master/Samples/Actors
<https://github.com/IKende/XRPC/tree/master/Samples/Actors>
热门工具 换一换