~singpolyma/sgx-jmp

0d1a2003e184bf6614ae3774e5ef5192dd6be155 — Stephen Paul Weber 2 years ago b47b7d0 + 85a7a4e
Merge branch 'sip-concurrency'

* sip-concurrency:
  Limit call concurrency for outbound SIP
5 files changed, 71 insertions(+), 19 deletions(-)

M lib/call_attempt.rb
M lib/call_attempt_repo.rb
M lib/trust_level.rb
M test/test_helper.rb
M web.rb
M lib/call_attempt.rb => lib/call_attempt.rb +6 -7
@@ 6,13 6,12 @@ require_relative "tts_template"
require_relative "low_balance"

class CallAttempt
	def self.for(customer, rate, usage, trust_level, direction:, **kwargs)
		kwargs.merge!(direction: direction)
	def self.for(customer, rate, usage, supported, **kwargs)
		credit = [customer.minute_limit.to_d - usage, 0].max + customer.balance
		if !rate || !trust_level.support_call?(rate)
			Unsupported.new(direction: direction)
		if !supported
			Unsupported.new(**kwargs.slice(:direction))
		elsif credit < rate * 10
			NoBalance.for(customer, rate, usage, trust_level, **kwargs)
			NoBalance.for(customer, rate, usage, supported, **kwargs)
		else
			for_ask_or_go(customer, rate, usage, credit, **kwargs)
		end


@@ 97,12 96,12 @@ class CallAttempt
	end

	class NoBalance
		def self.for(customer, rate, usage, trust_level, direction:, **kwargs)
		def self.for(customer, rate, usage, supported, direction:, **kwargs)
			LowBalance.for(customer).then(&:notify!).then do |amount|
				if amount&.positive?
					CallAttempt.for(
						customer.with_balance(customer.balance + amount),
						rate, usage, trust_level, direction: direction, **kwargs
						rate, usage, supported, direction: direction, **kwargs
					)
				else
					NoBalance.new(balance: customer.balance, direction: direction)

M lib/call_attempt_repo.rb => lib/call_attempt_repo.rb +32 -6
@@ 34,20 34,46 @@ class CallAttemptRepo
		)
	end

	def starting_call(customer, call_id)
		redis.sadd(
			"jmp_customer_ongoing_calls-#{customer.customer_id}",
			call_id
		).then do
			redis.expire(
				"jmp_customer_ongoing_calls-#{customer.customer_id}",
				60 * 60
			)
		end
	end

	def ending_call(customer, call_id)
		redis.srem(
			"jmp_customer_ongoing_calls-#{customer.customer_id}",
			call_id
		)
	end

protected

	def find(customer, other_tel, direction:, **kwargs)
		EMPromise.all([
			find_rate(customer.plan_name, other_tel, direction),
			find_usage(customer.customer_id),
			TrustLevelRepo.new(db: db, redis: redis).find(customer)
		]).then do |(rate, usage, trust_level)|
		find_all(customer, other_tel, direction).then do |(rate, usage, tl, c)|
			CallAttempt.for(
				customer, rate, usage, trust_level, direction: direction, **kwargs
				customer, rate, usage,
				rate && tl.support_call?(rate, c || 0),
				direction: direction, **kwargs
			)
		end
	end

	def find_all(customer, other_tel, direction)
		EMPromise.all([
			find_rate(customer.plan_name, other_tel, direction),
			find_usage(customer.customer_id),
			TrustLevelRepo.new(db: db, redis: redis).find(customer),
			redis.scard("jmp_customer_ongoing_calls-#{customer.customer_id}")
		])
	end

	def find_usage(customer_id)
		db.query_one(<<~SQL, customer_id, default: { a: 0 }).then { |r| r[:a] }
			SELECT COALESCE(SUM(charge), 0) AS a FROM cdr_with_charge

M lib/trust_level.rb => lib/trust_level.rb +6 -6
@@ 34,8 34,8 @@ module TrustLevel
			new if manual == "Basement" || (!manual && settled_amount < 10)
		end

		def support_call?(rate)
			rate <= 0.02
		def support_call?(rate, concurrency)
			rate <= 0.02 && concurrency < 1
		end
	end



@@ 44,8 44,8 @@ module TrustLevel
			new if manual == "Paragon" || (!manual && settled_amount > 60)
		end

		def support_call?(*)
			true
		def support_call?(_, concurrency)
			concurrency < 10
		end
	end



@@ 67,8 67,8 @@ module TrustLevel
			@max_rate = EXPENSIVE_ROUTE.fetch(plan_name, 0.1)
		end

		def support_call?(rate)
			rate <= @max_rate
		def support_call?(rate, concurrency)
			rate <= @max_rate && concurrency < 4
		end
	end
end

M test/test_helper.rb => test/test_helper.rb +25 -0
@@ 199,6 199,31 @@ class FakeRedis
		get(key).then { |v| v.to_i.to_s(2)[bit].to_i }
	end

	def hget(key, field)
		@values.dig(key, field)
	end

	def hincrby(key, field, incrby)
		@values[key] ||= {}
		@values[key][field] ||= 0
		@values[key][field] += incrby
	end

	def sadd(key, member)
		@values[key] ||= Set.new
		@values[key] << member
	end

	def srem(key, member)
		@values[key].delete(member)
	end

	def scard(key)
		@values[key]&.size || 0
	end

	def expire(_, _); end

	def exists(*keys)
		EMPromise.resolve(
			@values.select { |k, _| keys.include? k }.size

M web.rb => web.rb +2 -0
@@ 299,6 299,7 @@ class Web < Roda
				r.post "status" do
					log.info "#{params['eventType']} #{params['callId']}", loggable_params
					if params["eventType"] == "disconnect"
						call_attempt_repo.ending_call(c, params["callId"])
						CDR.for_outbound(params).save.catch(&method(:log_error))
					end
					"OK"


@@ 317,6 318,7 @@ class Web < Roda
						).then do |ca|
							r.json { ca.to_json }

							call_attempt_repo.starting_call(c, params["callId"])
							render(*ca.to_render)
						end
					end