M frontend/src/components/TheNewMachineForm.vue => frontend/src/components/TheNewMachineForm.vue +2 -3
@@ 67,10 67,9 @@ const socketState = websocketStore()
socketState.init(machineId)
socketState.$subscribe((_, state) => {
- const isAckReceived = state.ackedMessages.some((msg) => msg === messageId)
- if (isAckReceived) {
+ if (state.machine.machineId) {
isLoading.value = false
- machineState.loadNewMachine(machineId, machineName.value)
+ machineState.loadNewMachine(state.machine)
}
})
M frontend/src/stores/vending-machine.ts => frontend/src/stores/vending-machine.ts +4 -6
@@ 72,13 72,11 @@ export const vendingMachineStore = defineStore('vending-machine', {
const dto = { machineId, machineName, messageId }
await MACHINE_CMD_API.post("/machine", dto)
},
- loadNewMachine(machineId: string, machineName: string) {
- this.machine.machineId = machineId
- sessionStorage.setItem("machineId", machineId)
- this.machine.machineName = machineName
- sessionStorage.setItem("machineName", machineName)
+ loadNewMachine(machine: Machine) {
+ this.machine = machine
+ sessionStorage.setItem("machineId", machine.machineId)
+ sessionStorage.setItem("machineName", machine.machineName)
- this.machine.sections = []
for (let i = 1; i <= MAX_SECTION; i++) {
this.machine.sections.push(new BeverageSection(i));
}
M frontend/src/stores/websocket-store.ts => frontend/src/stores/websocket-store.ts +8 -5
@@ 1,19 1,23 @@
import { defineStore } from 'pinia'
+import { Machine } from '@/types';
const WS_URL = 'ws://localhost:5001/query-api/machine/';
export const websocketStore = defineStore('websocket-store', {
state: () => ({
- ackedMessages: [] as string[],
+ machine: {} as Machine,
webSocket: {} as WebSocket,
connected: false,
}),
getters: {
- acks: (state) => state.ackedMessages,
- machineId: () => sessionStorage.getItem("machineId")!,
+ updatedMachine: (state) => state.machine,
+ machineId: (state) => state.machine.machineId,
},
actions: {
init(machineId: string) {
+ if (!machineId) {
+ return
+ }
this.webSocket = new WebSocket(WS_URL + machineId)
this.webSocket.onopen = this.onOpen
@@ 32,8 36,7 @@ export const websocketStore = defineStore('websocket-store', {
},
onMessage(event: MessageEvent) {
console.log("Websocket message received", event.data)
- this.ackedMessages.push(event.data)
- console.log("***** debug ackedMessages", this.ackedMessages)
+ this.machine = JSON.parse(event.data)
this.connected = false
},
onError(event: Event) {
M machine-cmd/src/main/resources/application.properties => machine-cmd/src/main/resources/application.properties +1 -1
@@ 5,5 5,5 @@ quarkus.http.port=5000
mp.messaging.outgoing.vending-machine.connector=smallrye-kafka
mp.messaging.outgoing.vending-machine.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.vending-machine.cloud-events-mode=structured
-mp.messaging.outgoing.vending-machine.cloud-events-source=section-cmd
+mp.messaging.outgoing.vending-machine.cloud-events-source=machine-cmd
mp.messaging.outgoing.vending-machine.cloud-events-subject=Topic for vending machine related events
M machine-query/src/main/java/com/vm/machine/query/domain/model/VendingMachineModel.java => machine-query/src/main/java/com/vm/machine/query/domain/model/VendingMachineModel.java +0 -4
@@ 21,8 21,4 @@ public class VendingMachineModel extends PanacheEntityBase {
public VendingMachineModel() {
}
-
- public VendingMachineModel(String id) {
- this.id = id;
- }
}
M machine-query/src/main/java/com/vm/machine/query/domain/queries/infra/EventHandler.java => machine-query/src/main/java/com/vm/machine/query/domain/queries/infra/EventHandler.java +3 -2
@@ 2,8 2,9 @@ package com.vm.machine.query.domain.queries.infra;
import com.common.events.SectionCreatedEvent;
import com.common.events.VendingMachineCreatedEvent;
+import com.vm.machine.query.domain.model.VendingMachineModel;
public interface EventHandler {
- void on(SectionCreatedEvent event);
- void on(VendingMachineCreatedEvent e);
+ VendingMachineModel on(SectionCreatedEvent event);
+ VendingMachineModel on(VendingMachineCreatedEvent e);
}
M machine-query/src/main/java/com/vm/machine/query/domain/queries/infra/VendingMachineEventHandler.java => machine-query/src/main/java/com/vm/machine/query/domain/queries/infra/VendingMachineEventHandler.java +8 -3
@@ 11,7 11,7 @@ import jakarta.transaction.Transactional;
public class VendingMachineEventHandler implements EventHandler{
@Transactional
@Override
- public void on(SectionCreatedEvent event) {
+ public VendingMachineModel on(SectionCreatedEvent event) {
var sectionModel = new SectionModel();
sectionModel.machine = VendingMachineModel.findById(event.getAggregateId());;
sectionModel.sectionNumber = event.getSectionNumber();
@@ 21,13 21,18 @@ public class VendingMachineEventHandler implements EventHandler{
sectionModel.setupDate = event.getSetupDate();
sectionModel.persist();
+
+ return sectionModel.machine;
}
@Transactional
- @Override public void on(VendingMachineCreatedEvent event) {
- var vmModel = new VendingMachineModel(event.getAggregateId());
+ @Override public VendingMachineModel on(VendingMachineCreatedEvent event) {
+ var vmModel = new VendingMachineModel();
+ vmModel.id = event.getAggregateId();
vmModel.name = event.getMachineName();
vmModel.persist();
+
+ return vmModel;
}
}
M machine-query/src/main/java/com/vm/machine/query/domain/queries/infra/VendingMachineEventsConsumer.java => machine-query/src/main/java/com/vm/machine/query/domain/queries/infra/VendingMachineEventsConsumer.java +13 -10
@@ 1,12 1,13 @@
package com.vm.machine.query.domain.queries.infra;
+import com.common.dto.VendingMachineDTO;
import com.common.events.SectionCreatedEvent;
import com.common.events.VendingMachineCreatedEvent;
+import com.vm.machine.query.mappers.VendingMachineModelMapper;
import io.smallrye.reactive.messaging.annotations.Blocking;
import io.smallrye.reactive.messaging.kafka.impl.ce.DefaultIncomingKafkaCloudEventMetadata;
import io.vertx.core.json.JsonObject;
import jakarta.enterprise.context.ApplicationScoped;
-import java.util.concurrent.CompletionStage;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
@@ 15,16 16,18 @@ import org.eclipse.microprofile.reactive.messaging.Outgoing;
@ApplicationScoped
public class VendingMachineEventsConsumer implements EventConsumer {
private final EventHandler eventHandler;
+ private final VendingMachineModelMapper mapper;
- public VendingMachineEventsConsumer(EventHandler eventHandler) {
+ public VendingMachineEventsConsumer(EventHandler eventHandler, VendingMachineModelMapper mapper) {
this.eventHandler = eventHandler;
+ this.mapper = mapper;
}
@Blocking(ordered = false)
@Incoming("vending-machine")
@Outgoing("frontend-ack")
- @Acknowledgment(Acknowledgment.Strategy.POST_PROCESSING)
- public String process(Message<JsonObject> event) throws ClassNotFoundException {
+ @Acknowledgment(Acknowledgment.Strategy.MANUAL)
+ public VendingMachineDTO process(Message<JsonObject> event) throws ClassNotFoundException {
var incomingKafkaCloudEventMetadata = event.getMetadata(DefaultIncomingKafkaCloudEventMetadata.class);
if(incomingKafkaCloudEventMetadata.isEmpty()) {
throw new IllegalArgumentException("The event does not have Event Cloud metadata.");
@@ 38,14 41,14 @@ public class VendingMachineEventsConsumer implements EventConsumer {
return switch (parsedEvent) {
case VendingMachineCreatedEvent e -> {
- eventHandler.on(e);
- //event.ack();
- yield e.getMessageId();
+ var machine = eventHandler.on(e);
+ event.ack();
+ yield mapper.toMachineDTO(machine);
}
case SectionCreatedEvent e -> {
- eventHandler.on(e);
- //event.ack();
- yield e.getMessageId();
+ var machine = eventHandler.on(e);
+ event.ack();
+ yield mapper.toMachineDTO(machine);
}
default -> throw new IllegalArgumentException("Unknown event: " + parsedEvent.getClass().getTypeName());
};
A machine-query/src/main/java/com/vm/machine/query/encoders/MachineDtoEncoder.java => machine-query/src/main/java/com/vm/machine/query/encoders/MachineDtoEncoder.java +28 -0
@@ 0,0 1,28 @@
+package com.vm.machine.query.encoders;
+
+import com.common.dto.VendingMachineDTO;
+import jakarta.json.bind.Jsonb;
+import jakarta.json.bind.JsonbBuilder;
+import jakarta.websocket.EncodeException;
+import jakarta.websocket.Encoder;
+import jakarta.websocket.EndpointConfig;
+import java.util.List;
+
+public class MachineDtoEncoder implements Encoder.Text<VendingMachineDTO> {
+ private static final Jsonb jsonb = JsonbBuilder.create();
+ @Override public String encode(VendingMachineDTO dto) throws EncodeException {
+ if(dto.getSections() == null) {
+ dto.setSections(List.of());
+ }
+
+ return jsonb.toJson(dto);
+ }
+
+ @Override public void init(EndpointConfig config) {
+ Text.super.init(config);
+ }
+
+ @Override public void destroy() {
+ Text.super.destroy();
+ }
+}
R machine-query/src/main/java/com/vm/machine/query/endpoints/SectionQueryEndpoint.java => machine-query/src/main/java/com/vm/machine/query/endpoints/MachineQueryEndpoint.java +3 -4
@@ 9,18 9,17 @@ import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.Response;
-import java.util.List;
import org.jboss.logging.Logger;
@ApplicationScoped
@Path("")
-public class SectionQueryEndpoint {
- private static final Logger LOGGER = Logger.getLogger(SectionQueryEndpoint.class);
+public class MachineQueryEndpoint {
+ private static final Logger LOGGER = Logger.getLogger(MachineQueryEndpoint.class);
private final QueryDispatcher queryDispatcher;
private final VendingMachineModelMapper mapper;
- public SectionQueryEndpoint(QueryDispatcher queryDispatcher, VendingMachineModelMapper mapper) {
+ public MachineQueryEndpoint(QueryDispatcher queryDispatcher, VendingMachineModelMapper mapper) {
this.queryDispatcher = queryDispatcher;
this.mapper = mapper;
}
M machine-query/src/main/java/com/vm/machine/query/endpoints/WebsocketEndpoint.java => machine-query/src/main/java/com/vm/machine/query/endpoints/WebsocketEndpoint.java +38 -11
@@ 1,5 1,7 @@
package com.vm.machine.query.endpoints;
+import com.common.dto.VendingMachineDTO;
+import com.vm.machine.query.encoders.MachineDtoEncoder;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.websocket.OnError;
import jakarta.websocket.OnMessage;
@@ 8,12 10,13 @@ import jakarta.websocket.Session;
import jakarta.websocket.server.PathParam;
import jakarta.websocket.server.ServerEndpoint;
import jakarta.ws.rs.Path;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.jboss.logging.Logger;
-@ServerEndpoint("/query-api/machine/{machineId}")
+@ServerEndpoint(value = "/query-api/machine/{machineId}", encoders = MachineDtoEncoder.class)
@ApplicationScoped
public class WebsocketEndpoint {
private static final Logger LOGGER = Logger.getLogger(WebsocketEndpoint.class);
@@ 21,23 24,47 @@ public class WebsocketEndpoint {
@OnOpen
public void onOpen(Session session, @PathParam("machineId") String machineId) {
+ if(machineId == null || machineId.isEmpty()) {
+ return;
+ }
+
+ var existentSession = sessions.get(machineId);
+ if(existentSession != null) {
+ if(!existentSession.isOpen()) {
+ sessions.remove(machineId);
+ }else {
+ LOGGER.info("A session with the machine " + machineId + " already exists.");
+ return;
+ }
+ }
LOGGER.info("Machine " + machineId + " joined. Session ID: " + session.getId());
- sessions.putIfAbsent(session.getId(), session);
+ sessions.putIfAbsent(machineId, session);
}
- @OnMessage
+ //@OnMessage
@Incoming("frontend-ack")
- public void broadcast(String messageId){
- if(messageId == null || messageId.isEmpty()) {
+ public void pushDto(VendingMachineDTO dto){
+ if(dto == null) {
+ return;
+ }
+
+ var session = sessions.get(dto.getMachineId());
+
+ if(session == null) {
+ LOGGER.error("No session found for machine ID " + dto.getMachineId());
+ return;
+ }
+
+ if(!session.isOpen()){
+ LOGGER.error("The session closed with the machine ID " + dto.getMachineId());
+ sessions.remove(dto.getMachineId());
return;
}
- sessions.values().forEach(s -> {
- s.getAsyncRemote().sendObject(messageId, result -> {
- if (result.getException() != null) {
- LOGGER.error("Unable to send websocket message. Session ID " + s.getId(), result.getException());
- }
- });
+ session.getAsyncRemote().sendObject(dto, result -> {
+ if (result.getException() != null) {
+ LOGGER.error("Unable to send websocket message. Machine ID " + dto.getMachineId(), result.getException());
+ }
});
}
M machine-query/src/main/resources/application.properties => machine-query/src/main/resources/application.properties +2 -2
@@ 13,7 13,7 @@ quarkus.hibernate-orm.database.generation = update
%prod.kafka.bootstrap.servers=kafka:9092
mp.messaging.incoming.vending-machine.connector=smallrye-kafka
#mp.messaging.incoming.vending-machine.value.deserializer=io.vertx.kafka.client.serialization.JsonObjectDeserializer
-mp.messaging.incoming.vending-machine.auto.offset.reset=latest
-mp.messaging.incoming.vending-machine.group.id=section-projection
+mp.messaging.incoming.vending-machine.auto.offset.reset=earliest
+mp.messaging.incoming.vending-machine.group.id=machine-projection
mp.messaging.incoming.vending-machine.cloud-events-mode=structured
mp.messaging.incoming.vending-machine.cloud-events-source=section-cmd