~singpolyma/sgx-jmp

7d506e05e7d5e0a9e1c20c65bf2da4743caccd15 — Stephen Paul Weber 3 years ago e5b66f4
Port in inbound calls + voicemail

The craziest part of this is the workaround for a serious bug in Bandwidth's
HTTP voice API (which they may yet fix, still negotiating with them about that).

When a call comes in, every 10 seconds that it is not "answered" the inbound
call gets cancelled by their upstream peer and then get retried.  The caller
sees only one oubound call for this, so it doesn't look odd to them, but to us
it looks like they keep hanging up and trying again every 10 seconds.  So what
we do for now is we wait 2 seconds after they disconnect before we decide
they're really gone.  If they call back in those 2 seconds we just connect the
eventual bridge or voicemail to this new call and everything works out.

Ew.
M .rubocop.yml => .rubocop.yml +3 -0
@@ 15,6 15,9 @@ Metrics/MethodLength:
    - test/*

Metrics/BlockLength:
  ExcludedMethods:
    - route
    - "on"
  Exclude:
    - test/*


M Gemfile => Gemfile +1 -0
@@ 3,6 3,7 @@
source "https://rubygems.org"

gem "amazing_print"
gem "bandwidth-sdk"
gem "blather", git: "https://github.com/singpolyma/blather.git", branch: "ergonomics"
gem "braintree"
gem "dhall"

M config-schema.dhall => config-schema.dhall +1 -0
@@ 43,6 43,7 @@
, sgx : Text
, sip_host : Text
, upstream_domain : Text
, web : < Inet : { interface : Text, port : Natural } | Unix : Text >
, web_register : { from : Text, to : Text }
, xep0157 : List { label : Text, value : Text, var : Text }
}

M config.dhall.sample => config.dhall.sample +3 -0
@@ 1,3 1,5 @@
let ListenOn = < Inet: { interface: Text, port: Natural } | Unix: Text >
in
{
	component = {
		jid = "component.localhost",


@@ 8,6 10,7 @@
		port = 5347
	},
	sgx = "component2.localhost",
	web = ListenOn.Inet { interface = "::1", port = env:PORT ? 8080 },
	creds = {
		account = "00000",
		username = "dashboard user",

A lib/roda_capture.rb => lib/roda_capture.rb +37 -0
@@ 0,0 1,37 @@
# frozen-string-literal: true

# Build from official params_capturing plugin
class RodaCapture
	module RequestMethods
		def captures_hash
			@captures_hash ||= {}
		end

	private

		# Add the symbol to the list of capture names if capturing
		def _match_symbol(sym)
			@_sym_captures << sym if @_sym_captures

			super
		end

		# If all arguments are strings or symbols, turn on param capturing during
		# the matching, but turn it back off before yielding to the block.	Add
		# any captures to the params based on the param capture names added by
		# the matchers.
		def if_match(args)
			@_sym_captures = [] if args.all? { |x| x.is_a?(Symbol) }

			super do |*a|
				if @_sym_captures
					@_sym_captures.zip(a).each do |k, v|
						captures_hash[k] = v
					end
					@_sym_captures = nil
				end
				yield(*a)
			end
		end
	end
end

A public/beep.mp3 => public/beep.mp3 +0 -0
M sgx_jmp.rb => sgx_jmp.rb +7 -1
@@ 52,6 52,12 @@ CONFIG =
		"(#{ARGV[0]}) : #{__dir__}/config-schema.dhall",
		transform_keys: ->(k) { k&.to_sym }
	)
WEB_LISTEN =
	if CONFIG[:web].is_a?(Hash)
		[CONFIG[:web][:interface], CONFIG[:web][:port]]
	else
		[CONFIG[:web]]
	end

singleton_class.class_eval do
	include Blather::DSL


@@ 191,7 197,7 @@ when_ready do
		self << ping
	end

	Web.run(LOG.child, CustomerRepo.new)
	Web.run(LOG.child, CustomerRepo.new, *WEB_LISTEN)
end

# workqueue_count MUST be 0 or else Blather uses threads!

A views/bridge.slim => views/bridge.slim +3 -0
@@ 0,0 1,3 @@
doctype xml
Response
	Bridge= call_id

A views/pause.slim => views/pause.slim +3 -0
@@ 0,0 1,3 @@
doctype xml
Response
	Pause duration=duration

A views/redirect.slim => views/redirect.slim +3 -0
@@ 0,0 1,3 @@
doctype xml
Response
	Redirect redirectUrl=to /

A views/voicemail.slim => views/voicemail.slim +10 -0
@@ 0,0 1,10 @@
doctype xml
Response
	Pause duration=2
	== render(*ogm.to_render)
	PlayAudio= "/beep.mp3"
	Record{
		transcribe="true"
		recordingAvailableUrl="/inbound/calls/#{pseudo_call_id}/voicemail/audio"
		transcriptionAvailableUrl="/inbound/calls/#{pseudo_call_id}/voicemail/transcription"
		fileFormat="mp3"} /

A views/voicemail_ogm_media.slim => views/voicemail_ogm_media.slim +1 -0
@@ 0,0 1,1 @@
PlayAudio= url

A views/voicemail_ogm_tts.slim => views/voicemail_ogm_tts.slim +4 -0
@@ 0,0 1,4 @@
SpeakSentence
	' You have reached the voicemail of
	= fn
	| . Please send a text message, or leave a message after the tone.

M web.rb => web.rb +308 -22
@@ 1,61 1,344 @@
# frozen_string_literal: true

require "digest"
require "forwardable"
require "roda"
require "thin"
require "sentry-ruby"
require "bandwidth"

Faraday.default_adapter = :em_synchrony

require_relative "lib/cdr"
require_relative "lib/roda_capture"
require_relative "lib/roda_em_promise"
require_relative "lib/rack_fiber"

BANDWIDTH_VOICE = Bandwidth::Client.new(
	voice_basic_auth_user_name: CONFIG[:creds][:username],
	voice_basic_auth_password: CONFIG[:creds][:password]
).voice_client.client

module CustomerFwd
	def self.from_redis(redis, customer, tel)
		EMPromise.all([
			redis.get("catapult_fwd-#{tel}"),
			customer.fwd_timeout
		]).then do |(fwd, stimeout)|
			timeout = Timeout.new(stimeout)
			next if !fwd || timeout.zero?
			self.for(fwd, timeout)
		end
	end

	def self.for(uri, timeout)
		case uri
		when /^tel:/
			Tel.new(uri, timeout)
		when /^sip:/
			SIP.new(uri, timeout)
		when /^xmpp:/
			XMPP.new(uri, timeout)
		else
			raise "Unknown forward URI: #{uri}"
		end
	end

	class Timeout
		def initialize(s)
			@timeout = s.nil? || s.to_i.negative? ? 300 : s.to_i
		end

		def zero?
			@timeout.zero?
		end

		def to_i
			@timeout
		end
	end

	class Tel
		attr_reader :timeout

		def initialize(uri, timeout)
			@tel = uri.sub(/^tel:/, "")
			@timeout = timeout
		end

		def to
			@tel
		end
	end

	class SIP
		attr_reader :timeout

		def initialize(uri, timeout)
			@uri = uri
			@timeout = timeout
		end

		def to
			@uri
		end
	end

	class XMPP
		attr_reader :timeout

		def initialize(uri, timeout)
			@jid = uri.sub(/^xmpp:/, "")
			@timeout = timeout
		end

		def to
			"sip:#{ERB::Util.url_encode(@jid)}@sip.cheogram.com"
		end
	end
end

# rubocop:disable Metrics/ClassLength
class Web < Roda
	use Rack::Fiber # Must go first!
	use Sentry::Rack::CaptureExceptions
	plugin :json_parser
	plugin :public
	plugin :render, engine: "slim"
	plugin RodaCapture
	plugin RodaEMPromise # Must go last!

	class << self
		attr_reader :customer_repo, :log
	end
		attr_reader :true_inbound_call, :outbound_transfers

	def customer_repo
		Web.customer_repo
		def run(log, customer_repo, *listen_on)
			plugin :common_logger, log, method: :info
			@customer_repo = customer_repo
			@true_inbound_call = {}
			@outbound_transfers = {}
			Thin::Logging.logger = log
			Thin::Server.start(
				*listen_on,
				freeze.app,
				signals: false
			)
		end
	end

	extend Forwardable
	def_delegators :'self.class', :customer_repo, :true_inbound_call,
	               :outbound_transfers
	def_delegators :request, :params

	def log
		Web.log
		opts[:common_logger]
	end

	def log_error(e)
		log.error(
			"Error raised during #{request.full_path}: #{e.class}",
			e,
			loggable_params
		)
		if e.is_a?(::Exception)
			Sentry.capture_exception(e)
		else
			Sentry.capture_message(e.to_s)
		end
	end

	def loggable_params
		params.dup.tap do |p|
			p.delete("to")
			p.delete("from")
		end
	end

	def pseudo_call_id
		request.captures_hash[:pseudo_call_id] ||
			Digest::SHA256.hexdigest("#{params['from']},#{params['to']}")
	end

	def params
		request.params
	TEL_CANDIDATES = {
		"Restricted" => "14",
		"anonymous" => "15",
		"Anonymous" => "16",
		"unavailable" => "17",
		"Unavailable" => "18"
	}.freeze

	def sanitize_tel_candidate(candidate)
		if candidate.length < 3
			"13;phone-context=anonymous.phone-context.soprani.ca"
		elsif candidate[0] == "+" && /\A\d+\z/.match(candidate[1..-1])
			candidate
		elsif candidate == "Restricted"
			TEL_CANDIDATES.fetch(candidate, "19") +
				";phone-context=anonymous.phone-context.soprani.ca"
		end
	end

	def from_jid
		Blather::JID.new(
			sanitize_tel_candidate(params["from"]),
			CONFIG[:component][:jid]
		)
	end

	def inbound_calls_path(suffix)
		["/inbound/calls/#{pseudo_call_id}", suffix].compact.join("/")
	end

	def url(path)
		"#{request.base_url}#{path}"
	end

	def self.run(log, customer_repo)
		plugin :common_logger, log, method: :info
		@log = log
		@customer_repo = customer_repo
		Thin::Logging.logger = log
		Thin::Server.start(
			"::1",
			ENV.fetch("PORT", 8080),
			freeze.app,
			signals: false
	def modify_call(call_id)
		body = Bandwidth::ApiModifyCallRequest.new
		yield body
		BANDWIDTH_VOICE.modify_call(
			CONFIG[:creds][:account],
			call_id,
			body: body
		)
	end

	route do |r|
		r.on "outbound" do
		r.on "inbound" do
			r.on "calls" do
				r.post "status" do
					loggable = params.dup.tap { |p| p.delete("to") }
					log.info "#{params['eventType']} #{params['callId']}", loggable
					if params["eventType"] == "disconnect"
						CDR.for_disconnect(params).save.catch do |e|
							log.error("Error raised during /outbound/calls/status", e, loggable)
							Sentry.capture_exception(e)
						p_call_id = pseudo_call_id
						call_id = params["callId"]
						EM.promise_timer(2).then {
							next unless true_inbound_call[p_call_id] == call_id
							true_inbound_call.delete(p_call_id)

							if (outbound_leg = outbound_transfers.delete(p_call_id))
								modify_call(outbound_leg) do |call|
									call.state = "completed"
								end
							end

							customer_repo.find_by_tel(params["to"]).then do |customer|
								CDR.for_inbound(customer.customer_id, params).save
							end
						}.catch(&method(:log_error))
					end
					"OK"
				end

				r.on :pseudo_call_id do |pseudo_call_id|
					r.post "transfer_complete" do
						outbound_leg = outbound_transfers.delete(pseudo_call_id)
						if params["cause"] == "hangup"
							log.debug "Normal hangup", loggable_params
						elsif !outbound_leg
							log.debug "Inbound disconnected", loggable_params
						else
							log.debug "Go to voicemail", loggable_params
							true_call_id = true_inbound_call[pseudo_call_id]
							modify_call(true_call_id) do |call|
								call.redirect_url = url inbound_calls_path(:voicemail)
							end
						end
						""
					end

					r.on "voicemail" do
						r.post "audio" do
							duration = Time.parse(params["endTime"]) -
							           Time.parse(params["startTime"])
							next "OK<5" unless duration > 5

							jmp_media_url = params["mediaUrl"].sub(
								/\Ahttps:\/\/voice.bandwidth.com\/api\/v2\/accounts\/\d+/,
								"https://jmp.chat"
							)

							customer_repo.find_by_tel(params["to"]).then do |customer|
								m = Blather::Stanza::Message.new
								m.chat_state = nil
								m.from = from_jid
								m.subject = "New Voicemail"
								m.body = jmp_media_url
								m << OOB.new(jmp_media_url, desc: "Voicemail Recording")
								customer.stanza_to(m)

								"OK"
							end
						end

						r.post "transcription" do
							customer_repo.find_by_tel(params["to"]).then do |customer|
								m = Blather::Stanza::Message.new
								m.chat_state = nil
								m.from = from_jid
								m.subject = "Voicemail Transcription"
								m.body = BANDWIDTH_VOICE.get_recording_transcription(
									params["accountId"], params["callId"], params["recordingId"]
								).data.transcripts[0].text
								customer.stanza_to(m)

								"OK"
							end
						end

						r.post do
							customer_repo
								.find_by_tel(params["to"])
								.then { |customer| customer.ogm(params["from"]) }
								.then do |ogm|
									render :voicemail, locals: { ogm: ogm }
								end
						end
					end

					r.post do
						true_call_id = true_inbound_call[pseudo_call_id]
						render :bridge, locals: { call_id: true_call_id }
					end
				end

				r.post do
					if true_inbound_call[pseudo_call_id]
						true_inbound_call[pseudo_call_id] = params["callId"]
						return render :pause, locals: { duration: 300 }
					end

					customer_repo.find_by_tel(params["to"]).then do |customer|
						CustomerFwd.from_redis(::REDIS, customer, params["to"]).then do |fwd|
							if fwd
								body = Bandwidth::ApiCreateCallRequest.new.tap do |cc|
									cc.to = fwd.to
									cc.from = params["from"]
									cc.application_id = params["applicationId"]
									cc.call_timeout = fwd.timeout.to_i
									cc.answer_url = url inbound_calls_path(nil)
									cc.disconnect_url = url inbound_calls_path(:transfer_complete)
								end
								true_inbound_call[pseudo_call_id] = params["callId"]
								outbound_transfers[pseudo_call_id] = BANDWIDTH_VOICE.create_call(
									CONFIG[:creds][:account], body: body
								).data.call_id
								render :pause, locals: { duration: 300 }
							else
								render :redirect, locals: { to: inbound_calls_path(:voicemail) }
							end
						end
					end
				end
			end
		end

		r.on "outbound" do
			r.on "calls" do
				r.post "status" do
					log.info "#{params['eventType']} #{params['callId']}", loggable_params
					if params["eventType"] == "disconnect"
						CDR.for_outbound(params).save.catch(&method(:log_error))
					end
					"OK"
				end



@@ 70,5 353,8 @@ class Web < Roda
				end
			end
		end

		r.public
	end
end
# rubocop:enable Metrics/ClassLength