Push vs Pull integration
Pull is:
Accounting <-- getAccountBalance() --- Sales
-------- balance --------->
Push is:
Accounting -- AccountBalanceChanged--> Sales
- With pull model we get tight coupling of the systems. With push systems are loosely coupled. System receiving events uses denormalizers to build whatever structural model is needs.
- Why is push generally better than pull?
- What if we put Accounting system in Poland and Sales in South Africa?
- performance will suck when using pull integration model
- performance won't be affected as Sales will build it's own view model that can be queried without calling Accounting system
- weakest link antipattern will hurt systems using pull integration
- web services (think -> pull) cause Bounded Contexts boundaries to blur - my team needs to understand how other applications look at my system
- push reduces coupling between project teams - we don't have to wait for other teams to implement their functionality
- doing push means that we don't pollute our system with concepts of other systems
- replacing a system with a new one
- hard in PULL model (have to support how everyone sees our system)
- easy in PUSH (have to support only events)
- push keeps us from having a huge, messy canonical model
- With push integration we apply the same pattern we did for aggregates - reducing coupling through redundancy
- When can pull be beneficial?
- when complex calculations must be performed on the data and we don't want to put such logic in every system
- data from other system is vital for the business
- it's hard to emulate PUSH with an adapter on top of another system
- out of events coming from other systems we can build any possible structural model we need
- a system publishes a language other systems can listen to
- PUSH should be the default integration model
- we can degrade our SLAs in order to achieve higher uptime
- it's better to degrade SLA that being down
- having errors is often better than being down
- we introduce eventual consistency
- if risk goes too high because of stale data the business can hit the red button to bring the system down
- people are afraid of push integration because they are control freaks
- they like to have a central point that manages everything
- sending heartbeat messages ("hey, i'm still alive") to let other systems know that we're running fine so that they can act accordingly in case we are down
- with push we can do remote calculations without pissing off the users
- push makes eventual consistency explicit (we still have it implicit in PULL but prefer not to think about it)
- doing push == applying OO principles between systems
Versioning "is dead simple"
- wouldn't it be easy if we only added things?
- Let's consider version 1:
void deactivate() {// ...
Apply(new ItemDeactivated(id);
}
}
class InventoryItemDeactivated:Event{
public readonly Guid id;
InventoryItemDeactivated(Guid id){...}
}
- We'll move to version 2:
class InventoryItemDeactivated:Event{
public readonly Guid id;
InventoryItemDeactivated(Guid id){...}
}
// instead just copy & paste & rename:
public class InventoryItemDeactivated_V2:Event{
public final Guid id;
public final String comment;
InventoryItemDeactivated_V2(Guid id,String comment)
{...}
}
class InventoryItem {
void deactivate(String comment) {
if(comment.isNull())
}
class InventoryItem {
void deactivate(String comment) {
if(comment.isNull())
throw new ArgNullEx();
Apply(new ItemDeactivated_V2(id, comment);
// ...
}
}
Apply(new ItemDeactivated_V2(id, comment);
// ...
}
}
- copy & paste the apply() method to handle V2 event
- but - as no business logic needs the comment so we don't event copy it into an aggregate
- what about V57? gets a little dirty...
- new version of event is convertable from the old version of event
- if i can't transform v1 to v2 it's not the same event type!!!!
- new fields get default value in case of old version events
- let's have a method that converts event to newer version
InventoryItemDeactivatedEvent e){
return new InventoryItemDeactivatedEvent_V2(e.id, "BEFORE COMMENTS");
return new InventoryItemDeactivatedEvent_V2(e.id, "BEFORE COMMENTS");
// or another default value
}
public final Guid itemId;
public final int originalVersion;
// constructor...
}
class DeactivateInventoryItem_V2:Command{
final Guid itemId;
public final int originalVersion;
public final String comment;
// constructor
}
- now we can delete code that deals with old versions of events
- we have to version our commands with exactly the same pattern
public final Guid itemId;
public final int originalVersion;
// constructor...
}
class DeactivateInventoryItem_V2:Command{
final Guid itemId;
public final int originalVersion;
public final String comment;
// constructor
}
//let's jump into command handler:
[Depreciated("13/04/2011")]
public void handle(DeactivateInventoryItem m) {
var item = repo.getById(m.id);
item.deactivate("");
}
public void handle(DeactivateInventoryItem_V2 m) {
var item = repo.getById(m.id);
item.deactivate(m.comment);
}
public final Guid id;
// removed: public final String comment;
InventoryItemDeactivated_V3(Guid id){...}
}
//in the convert() function just don't copy the comment!
Merging
public class MergingHandler : Consumes {
public MergingHandler(Consumes next) {...}
public void consume(T message) {
var commit = eventStore.getEventsSinceVersion(
message.AggregateId,message.ExpectedVersion);
foreach(var e in commit) {
if(conflictsWith(message,e))
throw new RealConcurrencyEx();
}
next.handle(message);
}
}
// following code assumes usage of UOW
public class MergingHandler : Consumes {
public MergingHandler(Consumes next) {...}
public void consume(T message) {
var commit = eventStore.getEventsSinceVersion(
message.AggregateId,message.ExpectedVersion);
next.handle(message);
foreach(var e in commit) {
foreach(var attempted in UnitOfWork.Current.PeakAll()) {
// events that have been created by the aggregate during the operation
if(conflictsWith(attempted,e))
throw new RealConcurrencyEx();
}
}
}
}
public class MergingHandler : Consumes {
public MergingHandler(Consumes next) {...}
public void consume(T message) {
try {
BEGIN:
var commit = eventStore.getEventsSinceVersion(
message.AggregateId,message.ExpectedVersion);
next.handle(message);
foreach(var e in commit) {
foreach(var attempted in UnitOfWork.Current.PeakAll()) {
if(conflictsWith(attempted,e))
throw new RealConcurrencyEx();
}
}
//normally that would be in another cmd handler:
UnitOfWork.current.commit();
}catch(ConcurrencyException e) {
goto BEGIN; // don't do that in production :)
}
}
}
Eventual consistency
NEVER USE WORD "INCONSISTENT" WITH BUSINESS PERSON. SAY "OLD", "STALE" ETC
[Depreciated("13/04/2011")]
public void handle(DeactivateInventoryItem m) {
var item = repo.getById(m.id);
item.deactivate("");
}
public void handle(DeactivateInventoryItem_V2 m) {
var item = repo.getById(m.id);
item.deactivate(m.comment);
}
- we don't need any support for versioning in our serialization infrastructure
- generally we keep 2-3 versions of a command and delete old versions(both handler and command) after some time
- "how many test you web pages with IE4? why? don't you wanna support them?"
- keeping multiple versions running concurrently lets the clients do the transition
- we never change events!!
- we add a new event
- a deleting change example: v3 without the comment:
public final Guid id;
// removed: public final String comment;
InventoryItemDeactivated_V3(Guid id){...}
}
//in the convert() function just don't copy the comment!
- snapshots (using memento pattern):
- do it like commands - add a new handling method and keep it until it's no longer needed, then delete it
- to prevent events & commands from being changed
- don't write them, generate them from XSD
- use some tool to detect changes made to XSD and reject checkins
- bigger problem: we realize that our aggregate boundaries were wrong, what's now?
- write a little script to break events apart:
- build the original aggregate, build a new aggregate from it and save it (keep the reference (id) to the old aggregate)
- this is annoying task but doesn't happen very often
- keeping the reference to original aggreagate help other systems integrated in PUSH way (like our read model?) keep their model intact
- prefer flat events over those containing little data objects - this is a trade-off between coupling and duplication
- it's harder to measure coupling than duplication so normally we don't see those problems
- most of the time we introduce coupling to avoid duplication because duplication is easier to spot
- flat events don't have problems when a data object definition changes (how would we version that?)
Merging
- how to get optimal level concurrency?
- merging prevents most of the problems with optimistic concurrency
public class MergingHandler : Consumes
public MergingHandler(Consumes
public void consume(T message) {
var commit = eventStore.getEventsSinceVersion(
message.AggregateId,message.ExpectedVersion);
foreach(var e in commit) {
if(conflictsWith(message,e))
throw new RealConcurrencyEx();
}
next.handle(message);
}
}
- doesn't comparing commands to events seem wrong?
- duplicates the business logic from the domain (aggregate)
// following code assumes usage of UOW
public class MergingHandler : Consumes
public MergingHandler(Consumes
public void consume(T message) {
var commit = eventStore.getEventsSinceVersion(
message.AggregateId,message.ExpectedVersion);
next.handle(message);
foreach(var e in commit) {
foreach(var attempted in UnitOfWork.Current.PeakAll()) {
// events that have been created by the aggregate during the operation
if(conflictsWith(attempted,e))
throw new RealConcurrencyEx();
}
}
}
}
- we can often have general rules for generic conflict detection, like:
- events of same type tend to conflict
- unfortunately, the above example still misses an important thing...
public class MergingHandler : Consumes
public MergingHandler(Consumes
public void consume(T message) {
try {
BEGIN:
var commit = eventStore.getEventsSinceVersion(
message.AggregateId,message.ExpectedVersion);
next.handle(message);
foreach(var e in commit) {
foreach(var attempted in UnitOfWork.Current.PeakAll()) {
if(conflictsWith(attempted,e))
throw new RealConcurrencyEx();
}
}
//normally that would be in another cmd handler:
UnitOfWork.current.commit();
}catch(ConcurrencyException e) {
goto BEGIN; // don't do that in production :)
}
}
}
- this is simple because we store events - try doing it on sql database with current state data!
- in case of conflict rules that are not generic but domain-specific we usually add a conflictsWith(Event another) method on the event
Eventual consistency
- don't ask experts: "does the data needs to be eventuall consistent?"
- ask: "is it ok to have data that is X time old"
NEVER USE WORD "INCONSISTENT" WITH BUSINESS PERSON. SAY "OLD", "STALE" ETC
- for business people inconsistent=wrong
- how to get around problems with eventual consistency:
- easy thing: "your comment is waiting for moderation"
- last thing to do when everything else fails: fake the changes in the client. make it look like things have happened for the user making the changes
- UI design & correct user's expectations
- educate the user:
- tell them that sometimes software takes a second to think about what it's doing.
- if the data is not there immediately, wait 2 seconds and press F5.
- if it's still not there immediately call tech support
- after 1st week users get the point and will wait a bit longer if required
- "they'are not all idiots"
- use task-based UIs to make system look consistent (maximize time between sending commands and issuing a query on the client)
- do we have to handle everything in the same pipe? maybe we can high- and low-priority pipes for different things in the system?
- Set-based validation
- what about validating that all usernames must be unique?
- we only have consistency within a single AR
- do we want to an AllUsers aggregate? erm, maybe not...
- ask: how bad is if two users get created with same username withing 500ms of each other?
- we can see that something is wrong in an event handler (not a part of read model) and for example send an email?
- if we don't trust our clients we can put a validating layer on top of command endpoint checking the constraints in the read layer (but anyway - if the don't behave well they just get bad user experience)
- more often than not if you ask about this topic you'll get redirected to this post
- REMEMBER: solve problems in a business-centric way
- put a queue in front of the command handlers
- traffic spikes won't overload the system
- but we can't ACK/NACK the command - we say we accepted the command and assume it will work
- client has to be "pretty damn certain that the command won't fail"
- might want to provide some minimal validation just before putting cmd into the queue
- most people just don't need such architecture, but one-way command pattern is extermaly valuable when they do
- most message-oriented middleware isn't service bus
- point-to-point == observer pattern
- easy, great choice with only a few of queues to set up
- gets complex with many connections, not scalable in this case
- hub & spoke - middle-man observer pattern
- we end up buying tibco or biztalk and start putting a lot of logic into it (workflows ...) and it quickly becomes a tangled mess
- watching messages flow within organization is easy (debugging too)
- single point of failure - when hub is down everything is down
- service bus
- we distribute the routing information
- single point of failure no longer exists
- can be hard to manage from network perspective
- is a gross overkill in most cases
- debugging message flows becomes a pain
- extra features offered by service buses cause lots of logic to be put into transport
- a bit of humour: IP over Avian Carriers
- big lol but...
- "never underestimate the throughput of a truck full of DVDs - highly latent, huge bandwidth"
Sagas
- what is a saga?
- long-running business process? "long" can mean different things ;)
- something that spans multiple transaction boundaries and ensures a process of getting back to a known good state if we fail in one of the transactions
- got some hand-made drawings but don't feel like trying to re-create them in GIMP. why can't I find on Linux something as easy to use as M$ Paint?)
- most companies get their competitive advantage not from a single system but from a bunch of interoperating systems
- we need a facilitator instead of a bunch of business experts from specific domains
- the PHBs in suits talking about kanban & lean (process optimization person - we don't want to act as one in this situation)
- sagas do not contain business logic
- set up a set of dependencies:
- who
- needs
- what
- when?
- sagas move data to the right place at the right time for someone else to do the job
- saga always starts in response to a single event coming out of domain model
- choreographs the process and makes sure we reach the end
- use a correlation id to know which events are related
- most of the cases it's a part of the message.
- we might have multiple correlation ids.
- sagas are state machines
- but we don't have implement it as one (few people think in state machines)
- between events saga goes to sleep ( join calculus (think: wait, Future etc, continuations))
- saga does the routing logic
- it does not create data, just routes it between systems
- some things have to happen before some amount of time passes
- like in the movie Memento
- no long term memory, have someone else providing information
- use alarm clock for that - pass it a message that is an envelope for the message saga will send (?)
- we want to avoid having state if possible, it should appear when we need it
- types of sagas:
- request-response based sagas
- document based sagas
- commands & events from individual systems become (are starting point for ) ubiquitous language
- a saga often starts another saga (for example for handling rollbacks)
- dashboards might be easily created from sagas data store (select * from sagastate ...)
- if such a process is really important for our business why don't we model it (explicitly)?
- sagas are extremally easy to test
- small DSL for describing sagas
- prove that you always exit correctly
- generate all possible paths to exit
- document oriented process
- like with paper documents multiple persons use & fill with more info
- most processes we try to implement has already been done before computers, on paper
- but we forgot how we did it (and do the analysis again)
- document based sagas are what you need in such cases
- in case of big documents we don't send the whole document back and forth, we set up some storage for them and only send the links
- RULE OF THUMB FOR VERSIONING SAGAS
- when i release a new version all sagas already running stay in old version, all new will be run in new version (unless we've found a really bad bug in old implementation)
- changing running sagas is dangerous and should be avoided
- this rule makes versioning simple
Scaling writes
- we only guarantee CA out from CAP on the write side so we can't partition it
- we can do real-time systems with CQRS
- stereotypical architecture: single db, multiple app servers with load balancer in front
- pros
- fault tolerance
- can do software upgrade without going down
- knowledge about it is widespread
- cons
- app servers must be stateless!
- can't be scaled (just buy a bigger database)
- database remains a single point of failure
- database might be a performance bottleneck
- it's good but has limitations
- let's replace the database with a event store!
- there's no functional difference between this solution and previous one
- loading aggregates on each request increases latency
- we might split event store into multiple stores, based on aggregate ID (sharding)
- this can (theoretically) go as far as having a single event store per aggregate
- problem happens when one of the datastores goes down
- we could multiply them with a master-slave pattern
- but: each slave increases latency
- this allows scaling out our event store
- in order to reduce latency we can switch from stateless to statefull app servers
- we have a message router (with fast, in-memory routing info) which knows which aggregate resides in each app server
- loaded aggregate stays in memory of the app server
- over time event store becomes write-only
- when a app server goes down message router must distribute it's job among other servers
- this can cause latency spike unacceptable for some real-time systems
- to solve the problem we can use a warm replica
- just as in previous example but:
- when message is routed to a server another server is told to shadow the aggregate that the message was directed to
- shadowing server loads the AR and subscribes to it's events
- events are delivered to shadowing systems by a publisher
- stays ~100ms behind original write
- can use UDP multicast for publishing events
- when a server goes down shadowing server is only 100ms behind it and requires small operation to catch up with current state
- this greatly reduces the latency spike when a server is going down
- but...
- we can get rid of the spike completely!
- when shadowing server receives first command it can act as if it was up-to-date
- but still listen to events from event store!
- until it gets events it created itself it tries to merge
- same code as regular events merging!
- when it does get its own events it unsubscribes
- many businesses will accept the risk of possible merging problems to avoid latency spikes
- with this architecture there are no more reads from the event store!
Occasionally connected systems
My notes here are barely readable drawings on paper with some (even less readable) text here and there. Will unfortunately have to skip it (I'm certainly NOT doing those drawings in GIMP!) but...
Greg already had a presentation on this subject recorded. It covers the same topics (watched it few days before the class).
The interesting thing here is the conclusion: CQRS is nothing else as plain, old, good MVC (as initially done in Smalltalk) brought to architectural level.
None of these ideas are new.
Isn't it cool?
The important lesson is:
Review what you have already done.
== END OF DAY 3 ===
and unfortunately of the whole training. A pity, I wouldn't mind at all spending few more days attending to such a great class! Thanks a lot for it, Greg!
Brak komentarzy:
Prześlij komentarz