M lib/command.rb => lib/command.rb +7 -2
@@ 4,6 4,7 @@ require "sentry-ruby"
require "statsd-instrument"
require_relative "customer_repo"
+require_relative "session_manager"
class Command
def self.execution
@@ 27,6 28,8 @@ class Command
end
class Execution
+ class Timeout < SessionManager::Timeout; end
+
class FinalStanza
attr_reader :stanza
@@ 59,8 62,10 @@ class Command
reply.status = :executing
end
yield stanza if block_given?
- COMMAND_MANAGER.write(stanza).then do |new_iq|
+ COMMAND_MANAGER.write(stanza).then { |new_iq|
@iq = new_iq
+ }.catch_only(SessionManager::Timeout) do
+ EMPromise.reject(Timeout.new)
end
end
@@ 107,7 112,7 @@ class Command
next EMPromise.reject(iq) unless iq.cancel?
finish(status: :canceled)
- }.catch_only(FinalStanza) { |e|
+ }.catch_only(Timeout) {}.catch_only(FinalStanza) { |e|
@blather << e.stanza
}.catch do |e|
log_error(e)
A lib/session_manager.rb => lib/session_manager.rb +39 -0
@@ 0,0 1,39 @@
+# frozen_string_literal: true
+
+class SessionManager
+ class Timeout < StandardError; end
+
+ def initialize(blather, id_msg, timeout: 5, error_if: nil)
+ @blather = blather
+ @sessions = {}
+ @id_msg = id_msg
+ @timeout = timeout
+ @error_if = error_if
+ end
+
+ def promise_for(stanza)
+ id = "#{stanza.to.stripped}/#{stanza.public_send(@id_msg)}"
+ @sessions.fetch(id) do
+ @sessions[id] = EMPromise.new
+ EM.add_timer(@timeout) do
+ @sessions.delete(id)&.reject(Timeout.new)
+ end
+ @sessions[id]
+ end
+ end
+
+ def write(stanza)
+ promise = promise_for(stanza)
+ @blather << stanza
+ promise
+ end
+
+ def fulfill(stanza)
+ id = "#{stanza.from.stripped}/#{stanza.public_send(@id_msg)}"
+ if stanza.error? || @error_if&.call(stanza)
+ @sessions.delete(id)&.reject(stanza)
+ else
+ @sessions.delete(id)&.fulfill(stanza)
+ end
+ end
+end
M sgx_jmp.rb => sgx_jmp.rb +1 -36
@@ 72,6 72,7 @@ require_relative "lib/payment_methods"
require_relative "lib/registration"
require_relative "lib/transaction"
require_relative "lib/web_register_manager"
+require_relative "lib/session_manager"
require_relative "lib/statsd"
ELECTRUM = Electrum.new(**CONFIG[:electrum])
@@ 275,42 276,6 @@ message :error? do |m|
LOG.error "MESSAGE ERROR", stanza: m
end
-class SessionManager
- def initialize(blather, id_msg, timeout: 5, error_if: nil)
- @blather = blather
- @sessions = {}
- @id_msg = id_msg
- @timeout = timeout
- @error_if = error_if
- end
-
- def promise_for(stanza)
- id = "#{stanza.to.stripped}/#{stanza.public_send(@id_msg)}"
- @sessions.fetch(id) do
- @sessions[id] = EMPromise.new
- EM.add_timer(@timeout) do
- @sessions.delete(id)&.reject(:timeout)
- end
- @sessions[id]
- end
- end
-
- def write(stanza)
- promise = promise_for(stanza)
- @blather << stanza
- promise
- end
-
- def fulfill(stanza)
- id = "#{stanza.from.stripped}/#{stanza.public_send(@id_msg)}"
- if stanza.error? || @error_if&.call(stanza)
- @sessions.delete(id)&.reject(stanza)
- else
- @sessions.delete(id)&.fulfill(stanza)
- end
- end
-end
-
IQ_MANAGER = SessionManager.new(self, :id)
COMMAND_MANAGER = SessionManager.new(
self,