~bento/vending-machine

9378fec723d7745ecf84f1b49859a117d898466d — Fabio Bento Luiz 8 months ago 4588d1c
implement dead letter topic handler
20 files changed, 193 insertions(+), 60 deletions(-)

M common/src/main/java/com/common/dto/VendingMachineDTO.java
M cqrs-core/src/main/java/com/cqrs/core/events/BaseEvent.java
M cqrs-core/src/main/java/com/cqrs/core/messages/Message.java
M frontend/src/components/TheNewMachineForm.vue
M frontend/src/stores/websocket-store.ts
M frontend/src/types/index.ts
M frontend/src/types/machine.ts
A frontend/src/types/websocket-state.ts
R machine-cmd/src/main/java/com/vm/machine/cmd/configuration/{ApplicationApiConfiguration.java => ApiConfiguration.java}
R machine-cmd/src/main/java/com/vm/machine/cmd/configuration/{Configuration.java => CommandConfiguration.java}
M machine-cmd/src/main/java/com/vm/machine/cmd/endpoints/MachineEndpoint.java
M machine-cmd/src/main/resources/application.properties
R machine-query/src/main/java/com/vm/machine/query/configuration/{SectionQueryApplication.java => ApiConfiguration.java}
R machine-query/src/main/java/com/vm/machine/query/configuration/{Configuration.java => QueryConfiguration.java}
A machine-query/src/main/java/com/vm/machine/query/domain/queries/infra/DeadLetterTopicReader.java
M machine-query/src/main/java/com/vm/machine/query/domain/queries/infra/VendingMachineEventsConsumer.java
M machine-query/src/main/java/com/vm/machine/query/endpoints/WebsocketEndpoint.java
M machine-query/src/main/resources/application.properties
R payment-cmd/src/main/java/com/vm/payment/cmd/configuration/{ApplicationApiConfiguration.java => ApiConfiguration.java}
R payment-cmd/src/main/java/com/vm/payment/cmd/configuration/{Configuration.java => CommandConfiguration.java}
M common/src/main/java/com/common/dto/VendingMachineDTO.java => common/src/main/java/com/common/dto/VendingMachineDTO.java +9 -0
@@ 6,6 6,7 @@ public class VendingMachineDTO {
  private String machineId;
  private String machineName;
  private List<SectionDTO> sections;
  private boolean processingError;

  public String getMachineId() {
    return machineId;


@@ 30,4 31,12 @@ public class VendingMachineDTO {
  public void setSections(List<SectionDTO> sections) {
    this.sections = sections;
  }

  public boolean isProcessingError() {
    return processingError;
  }

  public void setProcessingError(boolean processingError) {
    this.processingError = processingError;
  }
}

M cqrs-core/src/main/java/com/cqrs/core/events/BaseEvent.java => cqrs-core/src/main/java/com/cqrs/core/events/BaseEvent.java +4 -0
@@ 5,6 5,10 @@ import com.cqrs.core.messages.Message;
public class BaseEvent extends Message {
    private long version;

    public BaseEvent() {
        this(null, null);
    }

    public BaseEvent(String messageId, String aggregateId) {
        super(messageId, aggregateId);
    }

M cqrs-core/src/main/java/com/cqrs/core/messages/Message.java => cqrs-core/src/main/java/com/cqrs/core/messages/Message.java +12 -0
@@ 5,10 5,22 @@ public abstract class Message {
    private final String aggregateId;

    public Message(String messageId, String aggregateId) {
        //validateIds(messageId, aggregateId);

        this.messageId = messageId;
        this.aggregateId = aggregateId;
    }

    private static void validateIds(String messageId, String aggregateId) {
        if (messageId == null || messageId.isEmpty()) {
            throw new IllegalArgumentException("A message must have a valid ID for tracking purposes.");
        }

        if (aggregateId == null || aggregateId.isEmpty()) {
            throw new IllegalArgumentException("A message must have a valid aggregate ID.");
        }
    }

    public String getMessageId() {
        return messageId;
    }

M frontend/src/components/TheNewMachineForm.vue => frontend/src/components/TheNewMachineForm.vue +39 -11
@@ 3,7 3,7 @@
    <v-card class="mx-auto px-6 py-8" max-width="344">
      <h3>Create a new vending machine:</h3>
      <br />
      <v-form ref="newMachineForm" @submit.prevent v-model="formValid">
      <v-form ref="newMachineForm" @submit.prevent>
        <v-text-field
          v-model="machineName"
          :rules="nameRules"


@@ 55,11 55,11 @@ const nameRules = [
    return 'A name or address is required.'
  }
]
let formValid = false
let machineId = String(self.crypto.randomUUID())
let messageId = String(self.crypto.randomUUID())
let hasError = ref(false)
let isLoading = ref(false)
let createMachineTimeout = setTimeout(() => {}, 0)

const machineState = vendingMachineStore()



@@ 67,30 67,58 @@ const socketState = websocketStore()
socketState.init(machineId)

socketState.$subscribe((_, state) => {
  console.log('WS machine state changed...', state)

  if (state.machine.machineId) {
    isLoading.value = false
    machineState.loadNewMachine(state.machine)
    clearTimeout(createMachineTimeout)

    if (state.machine.processingError) {
      setError()
    } else {
      isLoading.value = false
      machineState.loadNewMachine(state.machine)
    }
  }
})

function reset() {
  hasError.value = false
  newMachineForm.value.reset()
  //newMachineForm.value.reset()
}

function submit() {
  newMachineForm.value.validate()
  if (!formValid) {
    console.log('invalid form', formValid)
async function submit() {
  const { valid } = await newMachineForm.value.validate()
  if (!valid) {
    console.log('Invalid form.', valid)
    return
  }

  if (!socketState.isConnected) {
    console.error('It is not connected with the back-end')
    setError()
    return
  }

  hasError.value = false
  isLoading.value = true
  createMachineTimeout = setTimeout(setError, 5000)
  machineState.newVendindMachine(machineId, machineName.value, messageId).catch((error) => {
    hasError.value = true
    isLoading.value = false
    clearTimeout(createMachineTimeout)
    setError()
    console.error(error)
  })
}

function setError() {
  hasError.value = true
  isLoading.value = false

  resetIds()
}

function resetIds() {
  machineId = String(self.crypto.randomUUID())
  messageId = String(self.crypto.randomUUID())
  socketState.init(machineId)
}
</script>

M frontend/src/stores/websocket-store.ts => frontend/src/stores/websocket-store.ts +4 -5
@@ 1,5 1,5 @@
import { defineStore } from 'pinia'
import { Machine } from '@/types';
import { Machine, WebsocketState } from '@/types';

const WS_URL = 'ws://localhost:5001/query-api/machine/';



@@ 7,11 7,11 @@ export const websocketStore = defineStore('websocket-store', {
    state: () => ({
        machine: {} as Machine,
        webSocket: {} as WebSocket,
        connected: false,
    }),
    getters: {
        updatedMachine: (state) => state.machine,
        machineId: (state) => state.machine.machineId,
        isConnected: (state) => state.webSocket.readyState === WebsocketState.OPEN,
    },
    actions: {
        init(machineId: string) {


@@ 19,6 19,8 @@ export const websocketStore = defineStore('websocket-store', {
                return
            }

            this.machine = {} as Machine

            this.webSocket = new WebSocket(WS_URL + machineId)
            this.webSocket.onopen = this.onOpen
            this.webSocket.onclose = this.onClose


@@ 27,17 29,14 @@ export const websocketStore = defineStore('websocket-store', {
        },
        onOpen(event: Event) {
            console.log("Websocket connected", event)
            this.connected = true
        },
        onClose(event: CloseEvent) {
            console.log("Websocket connection closed", event)
            this.connected = false
            this.init(this.machineId)
        },
        onMessage(event: MessageEvent) {
            console.log("Websocket message received", event.data)
            this.machine = JSON.parse(event.data)
            this.connected = false
        },
        onError(event: Event) {
            console.error("Websocket connection error", event)

M frontend/src/types/index.ts => frontend/src/types/index.ts +1 -0
@@ 2,3 2,4 @@ export * from './machine'
export * from './beverage-section'
export * from './coins-base'
export * from './payment'
export * from './websocket-state'

M frontend/src/types/machine.ts => frontend/src/types/machine.ts +1 -1
@@ 2,6 2,6 @@ import { BeverageSection } from ".";

export class Machine {
    // eslint-disable-next-line @typescript-eslint/no-unused-vars
    constructor(public machineId: string, public machineName: string, public sections: BeverageSection[] = []) {
    constructor(public machineId: string, public machineName: string, public sections: BeverageSection[], public processingError: boolean) {
    }
}
\ No newline at end of file

A frontend/src/types/websocket-state.ts => frontend/src/types/websocket-state.ts +6 -0
@@ 0,0 1,6 @@
export enum WebsocketState {
    CONNECTING = 0,
    OPEN = 1,
    CLOSING = 2,
    CLOSED = 3
}
\ No newline at end of file

R machine-cmd/src/main/java/com/vm/machine/cmd/configuration/ApplicationApiConfiguration.java => machine-cmd/src/main/java/com/vm/machine/cmd/configuration/ApiConfiguration.java +1 -1
@@ 4,5 4,5 @@ import jakarta.ws.rs.ApplicationPath;
import jakarta.ws.rs.core.Application;

@ApplicationPath("/cmd-api")
public class ApplicationApiConfiguration extends Application {
public class ApiConfiguration extends Application {
}

R machine-cmd/src/main/java/com/vm/machine/cmd/configuration/Configuration.java => machine-cmd/src/main/java/com/vm/machine/cmd/configuration/CommandConfiguration.java +3 -3
@@ 14,9 14,9 @@ import org.jboss.logging.Logger;

@Unremovable
@ApplicationScoped
public class Configuration {
public class CommandConfiguration {

    private final Logger LOGGER = Logger.getLogger(Configuration.class);
    private final Logger LOGGER = Logger.getLogger(CommandConfiguration.class);
    private final CommandDispatcher commandDispatcher;
    private final CommandHandler commandHandler;



@@ 24,7 24,7 @@ public class Configuration {
        LOGGER.info("Registering command dispatcher handlers...");
    }

    public Configuration(CommandDispatcher commandDispatcher, CommandHandler commandHandler) {
    public CommandConfiguration(CommandDispatcher commandDispatcher, CommandHandler commandHandler) {
        this.commandDispatcher = commandDispatcher;
        this.commandHandler = commandHandler;
    }

M machine-cmd/src/main/java/com/vm/machine/cmd/endpoints/MachineEndpoint.java => machine-cmd/src/main/java/com/vm/machine/cmd/endpoints/MachineEndpoint.java +10 -2
@@ 9,6 9,7 @@ import jakarta.enterprise.context.ApplicationScoped;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.Response;
import org.jboss.logging.Logger;



@@ 24,13 25,20 @@ public class MachineEndpoint {

  @POST
  @Consumes({"application/json"})
  @Produces({"application/json"})
  @Path("/machine")
  public Response createMachine(MachineDto dto) {
    try {
      var command = new CreateVendingMachineCommand(dto);
      commandDispatcher.send(command);
    } catch (Exception e) {
      var safeMsg = "An unexpected error occurred loading the beverages.";

    } catch (IllegalArgumentException e){
      var safeMsg = "The message could not be processed due invalid arguments.";
      LOGGER.error(safeMsg, e);
      return Response.status(Response.Status.BAD_REQUEST).entity(safeMsg).build();
    }
    catch (Exception e) {
      var safeMsg = "An unexpected error occurred creating the machine.";
      LOGGER.error(safeMsg, e);
      return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(safeMsg).build();
    }

M machine-cmd/src/main/resources/application.properties => machine-cmd/src/main/resources/application.properties +1 -2
@@ 3,7 3,6 @@ quarkus.live-reload.instrumentation=true
quarkus.http.port=5000
%prod.kafka.bootstrap.servers=kafka:9092
mp.messaging.outgoing.vending-machine.connector=smallrye-kafka
mp.messaging.outgoing.vending-machine.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.vending-machine.cloud-events-mode=structured
mp.messaging.outgoing.vending-machine.cloud-events-mode=binary
mp.messaging.outgoing.vending-machine.cloud-events-source=machine-cmd
mp.messaging.outgoing.vending-machine.cloud-events-subject=Topic for vending machine related events

R machine-query/src/main/java/com/vm/machine/query/configuration/SectionQueryApplication.java => machine-query/src/main/java/com/vm/machine/query/configuration/ApiConfiguration.java +1 -1
@@ 4,5 4,5 @@ import jakarta.ws.rs.ApplicationPath;
import jakarta.ws.rs.core.Application;

@ApplicationPath("/query-api")
public class SectionQueryApplication extends Application {
public class ApiConfiguration extends Application {
}

R machine-query/src/main/java/com/vm/machine/query/configuration/Configuration.java => machine-query/src/main/java/com/vm/machine/query/configuration/QueryConfiguration.java +3 -3
@@ 12,12 12,12 @@ import org.jboss.logging.Logger;

@Unremovable
@ApplicationScoped
public class Configuration {
    private final Logger LOGGER = Logger.getLogger(Configuration.class);
public class QueryConfiguration {
    private final Logger LOGGER = Logger.getLogger(QueryConfiguration.class);
    private final QueryDispatcher queryDispatcher;
    private final QueryHandler queryHandler;

    public Configuration(QueryDispatcher queryDispatcher, QueryHandler queryHandler) {
    public QueryConfiguration(QueryDispatcher queryDispatcher, QueryHandler queryHandler) {
        this.queryDispatcher = queryDispatcher;
        this.queryHandler = queryHandler;
    }

A machine-query/src/main/java/com/vm/machine/query/domain/queries/infra/DeadLetterTopicReader.java => machine-query/src/main/java/com/vm/machine/query/domain/queries/infra/DeadLetterTopicReader.java +35 -0
@@ 0,0 1,35 @@
package com.vm.machine.query.domain.queries.infra;

import com.common.dto.VendingMachineDTO;
import com.cqrs.core.events.BaseEvent;
import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata;
import io.vertx.core.json.JsonObject;
import jakarta.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletionStage;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.jboss.logging.Logger;

@ApplicationScoped
public class DeadLetterTopicReader {
private static final String ERROR = "error";
  private final Logger LOGGER = Logger.getLogger(DeadLetterTopicReader.class);

  @Incoming("dead-letter-machine")
  @Outgoing("machine-error-state")
  public VendingMachineDTO dead(Message<JsonObject> rejected) {
    IncomingKafkaRecordMetadata metadata = rejected.getMetadata(IncomingKafkaRecordMetadata.class)
        .orElseThrow(() -> new IllegalArgumentException("Expected a message coming from Kafka"));
    String reason = new String(metadata.getHeaders().lastHeader("dead-letter-reason").value());
    LOGGER.infof("The message '%s' has been rejected and sent to the DLT. The reason is: '%s'.", rejected.getPayload(), reason);
    var event = rejected.getPayload().mapTo(BaseEvent.class);
    var machineErrorState = new VendingMachineDTO();
    machineErrorState.setMachineId(event.getAggregateId());
    machineErrorState.setProcessingError(true);

    rejected.ack();

    return machineErrorState;
  }
}

M machine-query/src/main/java/com/vm/machine/query/domain/queries/infra/VendingMachineEventsConsumer.java => machine-query/src/main/java/com/vm/machine/query/domain/queries/infra/VendingMachineEventsConsumer.java +22 -13
@@ 25,33 25,42 @@ public class VendingMachineEventsConsumer implements EventConsumer {

  @Blocking(ordered = false)
  @Incoming("vending-machine")
  @Outgoing("frontend-ack")
  @Outgoing("machine-state")
  @Acknowledgment(Acknowledgment.Strategy.MANUAL)
  public VendingMachineDTO process(Message<JsonObject> event) throws ClassNotFoundException {
    var incomingKafkaCloudEventMetadata = event.getMetadata(DefaultIncomingKafkaCloudEventMetadata.class);
    if(incomingKafkaCloudEventMetadata.isEmpty()) {
      throw new IllegalArgumentException("The event does not have Event Cloud metadata.");
  public VendingMachineDTO consume(Message<JsonObject> event) {
    VendingMachineDTO dto = null;

    try {
      dto = processEvent(event);
    } catch (Exception e) {
      event.nack(e);
      return null;
    }

    var metadata = incomingKafkaCloudEventMetadata.get();
    var eventClazzType = Class.forName(metadata.getType());
    event.ack();
    return dto;
  }

  private VendingMachineDTO processEvent(Message<JsonObject> event) throws ClassNotFoundException {
    var metadata = event.getMetadata(DefaultIncomingKafkaCloudEventMetadata.class)
        .orElseThrow(
            () -> new IllegalArgumentException("The event does not have Event Cloud metadata."));

    var eventClazzType = Class.forName(metadata.getType());

    var parsedEvent = event.getPayload().mapTo(eventClazzType);

    return switch (parsedEvent) {
      case VendingMachineCreatedEvent e -> {
        var machine = eventHandler.on(e);
        event.ack();
        yield  mapper.toMachineDTO(machine);
        yield mapper.toMachineDTO(machine);
      }
      case SectionCreatedEvent e -> {
        var machine = eventHandler.on(e);
        event.ack();
        yield  mapper.toMachineDTO(machine);
        yield mapper.toMachineDTO(machine);
      }
      default -> throw new IllegalArgumentException("Unknown event: " + parsedEvent.getClass().getTypeName());
      default -> throw new IllegalArgumentException(
          "Unknown event: " + parsedEvent.getClass().getTypeName());
    };

  }
}

M machine-query/src/main/java/com/vm/machine/query/endpoints/WebsocketEndpoint.java => machine-query/src/main/java/com/vm/machine/query/endpoints/WebsocketEndpoint.java +30 -8
@@ 4,13 4,10 @@ import com.common.dto.VendingMachineDTO;
import com.vm.machine.query.encoders.MachineDtoEncoder;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.websocket.OnError;
import jakarta.websocket.OnMessage;
import jakarta.websocket.OnOpen;
import jakarta.websocket.Session;
import jakarta.websocket.server.PathParam;
import jakarta.websocket.server.ServerEndpoint;
import jakarta.ws.rs.Path;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.eclipse.microprofile.reactive.messaging.Incoming;


@@ 31,7 28,7 @@ public class WebsocketEndpoint {
    var existentSession = sessions.get(machineId);
    if(existentSession != null) {
      if(!existentSession.isOpen()) {
        sessions.remove(machineId);
        //sessions.remove(machineId);
      }else {
        LOGGER.info("A session with the machine " + machineId + " already exists.");
        return;


@@ 41,9 38,8 @@ public class WebsocketEndpoint {
    sessions.putIfAbsent(machineId, session);
  }

  //@OnMessage
  @Incoming("frontend-ack")
  public void pushDto(VendingMachineDTO dto){
  @Incoming("machine-state")
  public void onMessage(VendingMachineDTO dto){
    if(dto == null) {
      return;
    }


@@ 57,7 53,33 @@ public class WebsocketEndpoint {

    if(!session.isOpen()){
      LOGGER.error("The session closed with the machine ID " + dto.getMachineId());
      sessions.remove(dto.getMachineId());
      //sessions.remove(dto.getMachineId());
      return;
    }

    session.getAsyncRemote().sendObject(dto, result -> {
      if (result.getException() != null) {
        LOGGER.error("Unable to send websocket message. Machine ID " + dto.getMachineId(), result.getException());
      }
    });
  }

  @Incoming("machine-error-state")
  public void onProcessingError(VendingMachineDTO dto){
    if(dto == null) {
      return;
    }

    var session = sessions.get(dto.getMachineId());

    if(session == null) {
      LOGGER.error("No session found for machine ID " + dto.getMachineId());
      return;
    }

    if(!session.isOpen()){
      LOGGER.error("The session closed with the machine ID " + dto.getMachineId());
      //sessions.remove(dto.getMachineId());
      return;
    }


M machine-query/src/main/resources/application.properties => machine-query/src/main/resources/application.properties +7 -4
@@ 1,19 1,22 @@
quarkus.devservices.enabled=false
quarkus.live-reload.instrumentation=true
quarkus.http.port=5001
#quarkus.datasource.jdbc=false
quarkus.datasource.jdbc.max-size=16
quarkus.datasource.db-kind=postgresql
quarkus.datasource.username=postgres
quarkus.datasource.password=postgres
quarkus.datasource.jdbc.url=jdbc:postgresql://localhost:5432/vending-machine
#quarkus.datasource.reactive.url = vertx-reactive:postgresql://localhost:5432/vending-machine

# drop-and-create the database at startup (use `update` to only update the schema)
quarkus.hibernate-orm.database.generation = update

%prod.kafka.bootstrap.servers=kafka:9092
mp.messaging.incoming.vending-machine.connector=smallrye-kafka
#mp.messaging.incoming.vending-machine.value.deserializer=io.vertx.kafka.client.serialization.JsonObjectDeserializer
mp.messaging.incoming.vending-machine.auto.offset.reset=earliest
mp.messaging.incoming.vending-machine.group.id=machine-projection
mp.messaging.incoming.vending-machine.cloud-events-mode=structured
mp.messaging.incoming.vending-machine.cloud-events-mode=binary
mp.messaging.incoming.vending-machine.cloud-events-source=section-cmd
mp.messaging.incoming.vending-machine.failure-strategy=dead-letter-queue
mp.messaging.incoming.vending-machine.dead-letter-queue.topic=dead-letter-machine

mp.messaging.incoming.dead-letter-machine.connector=smallrye-kafka

R payment-cmd/src/main/java/com/vm/payment/cmd/configuration/ApplicationApiConfiguration.java => payment-cmd/src/main/java/com/vm/payment/cmd/configuration/ApiConfiguration.java +1 -1
@@ 4,5 4,5 @@ import jakarta.ws.rs.ApplicationPath;
import jakarta.ws.rs.core.Application;

@ApplicationPath("/cmd-api")
public class ApplicationApiConfiguration extends Application {
public class ApiConfiguration extends Application {
}

R payment-cmd/src/main/java/com/vm/payment/cmd/configuration/Configuration.java => payment-cmd/src/main/java/com/vm/payment/cmd/configuration/CommandConfiguration.java +3 -5
@@ 2,9 2,7 @@ package com.vm.payment.cmd.configuration;

import com.cqrs.core.infrastructure.CommandDispatcher;
import com.vm.payment.cmd.commands.CommandHandler;
import com.vm.payment.cmd.commands.DebitChange;
import com.vm.payment.cmd.commands.LoadChangeTubes;
import com.vm.payment.cmd.commands.ProcessPayment;
import io.quarkus.arc.Unremovable;
import io.quarkus.runtime.StartupEvent;
import jakarta.annotation.PostConstruct;


@@ 14,12 12,12 @@ import org.jboss.logging.Logger;

@Unremovable
@ApplicationScoped
public class Configuration {
  private final Logger LOGGER = Logger.getLogger(Configuration.class);
public class CommandConfiguration {
  private final Logger LOGGER = Logger.getLogger(CommandConfiguration.class);
  private final CommandDispatcher commandDispatcher;
  private final CommandHandler commandHandler;

  public Configuration(CommandDispatcher commandDispatcher, CommandHandler commandHandler) {
  public CommandConfiguration(CommandDispatcher commandDispatcher, CommandHandler commandHandler) {
    this.commandDispatcher = commandDispatcher;
    this.commandHandler = commandHandler;
  }