From 85a7a4e90374801878f71518b548aeb5a00c528e Mon Sep 17 00:00:00 2001 From: Stephen Paul Weber Date: Tue, 19 Apr 2022 14:07:07 -0500 Subject: [PATCH] Limit call concurrency for outbound SIP --- lib/call_attempt.rb | 13 ++++++------- lib/call_attempt_repo.rb | 38 ++++++++++++++++++++++++++++++++------ lib/trust_level.rb | 12 ++++++------ test/test_helper.rb | 25 +++++++++++++++++++++++++ web.rb | 2 ++ 5 files changed, 71 insertions(+), 19 deletions(-) diff --git a/lib/call_attempt.rb b/lib/call_attempt.rb index 5a4e1d5..c08f443 100644 --- a/lib/call_attempt.rb +++ b/lib/call_attempt.rb @@ -6,13 +6,12 @@ require_relative "tts_template" require_relative "low_balance" class CallAttempt - def self.for(customer, rate, usage, trust_level, direction:, **kwargs) - kwargs.merge!(direction: direction) + def self.for(customer, rate, usage, supported, **kwargs) credit = [customer.minute_limit.to_d - usage, 0].max + customer.balance - if !rate || !trust_level.support_call?(rate) - Unsupported.new(direction: direction) + if !supported + Unsupported.new(**kwargs.slice(:direction)) elsif credit < rate * 10 - NoBalance.for(customer, rate, usage, trust_level, **kwargs) + NoBalance.for(customer, rate, usage, supported, **kwargs) else for_ask_or_go(customer, rate, usage, credit, **kwargs) end @@ -97,12 +96,12 @@ class CallAttempt end class NoBalance - def self.for(customer, rate, usage, trust_level, direction:, **kwargs) + def self.for(customer, rate, usage, supported, direction:, **kwargs) LowBalance.for(customer).then(&:notify!).then do |amount| if amount&.positive? CallAttempt.for( customer.with_balance(customer.balance + amount), - rate, usage, trust_level, direction: direction, **kwargs + rate, usage, supported, direction: direction, **kwargs ) else NoBalance.new(balance: customer.balance, direction: direction) diff --git a/lib/call_attempt_repo.rb b/lib/call_attempt_repo.rb index c3b2c4f..5e34cd9 100644 --- a/lib/call_attempt_repo.rb +++ b/lib/call_attempt_repo.rb @@ -34,20 +34,46 @@ class CallAttemptRepo ) end + def starting_call(customer, call_id) + redis.sadd( + "jmp_customer_ongoing_calls-#{customer.customer_id}", + call_id + ).then do + redis.expire( + "jmp_customer_ongoing_calls-#{customer.customer_id}", + 60 * 60 + ) + end + end + + def ending_call(customer, call_id) + redis.srem( + "jmp_customer_ongoing_calls-#{customer.customer_id}", + call_id + ) + end + protected def find(customer, other_tel, direction:, **kwargs) - EMPromise.all([ - find_rate(customer.plan_name, other_tel, direction), - find_usage(customer.customer_id), - TrustLevelRepo.new(db: db, redis: redis).find(customer) - ]).then do |(rate, usage, trust_level)| + find_all(customer, other_tel, direction).then do |(rate, usage, tl, c)| CallAttempt.for( - customer, rate, usage, trust_level, direction: direction, **kwargs + customer, rate, usage, + rate && tl.support_call?(rate, c || 0), + direction: direction, **kwargs ) end end + def find_all(customer, other_tel, direction) + EMPromise.all([ + find_rate(customer.plan_name, other_tel, direction), + find_usage(customer.customer_id), + TrustLevelRepo.new(db: db, redis: redis).find(customer), + redis.scard("jmp_customer_ongoing_calls-#{customer.customer_id}") + ]) + end + def find_usage(customer_id) db.query_one(<<~SQL, customer_id, default: { a: 0 }).then { |r| r[:a] } SELECT COALESCE(SUM(charge), 0) AS a FROM cdr_with_charge diff --git a/lib/trust_level.rb b/lib/trust_level.rb index e08036f..285850f 100644 --- a/lib/trust_level.rb +++ b/lib/trust_level.rb @@ -34,8 +34,8 @@ module TrustLevel new if manual == "Basement" || (!manual && settled_amount < 10) end - def support_call?(rate) - rate <= 0.02 + def support_call?(rate, concurrency) + rate <= 0.02 && concurrency < 1 end end @@ -44,8 +44,8 @@ module TrustLevel new if manual == "Paragon" || (!manual && settled_amount > 60) end - def support_call?(*) - true + def support_call?(_, concurrency) + concurrency < 10 end end @@ -67,8 +67,8 @@ module TrustLevel @max_rate = EXPENSIVE_ROUTE.fetch(plan_name, 0.1) end - def support_call?(rate) - rate <= @max_rate + def support_call?(rate, concurrency) + rate <= @max_rate && concurrency < 4 end end end diff --git a/test/test_helper.rb b/test/test_helper.rb index e71357c..5b34d26 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -199,6 +199,31 @@ class FakeRedis get(key).then { |v| v.to_i.to_s(2)[bit].to_i } end + def hget(key, field) + @values.dig(key, field) + end + + def hincrby(key, field, incrby) + @values[key] ||= {} + @values[key][field] ||= 0 + @values[key][field] += incrby + end + + def sadd(key, member) + @values[key] ||= Set.new + @values[key] << member + end + + def srem(key, member) + @values[key].delete(member) + end + + def scard(key) + @values[key]&.size || 0 + end + + def expire(_, _); end + def exists(*keys) EMPromise.resolve( @values.select { |k, _| keys.include? k }.size diff --git a/web.rb b/web.rb index 183746b..f8051cd 100644 --- a/web.rb +++ b/web.rb @@ -299,6 +299,7 @@ class Web < Roda r.post "status" do log.info "#{params['eventType']} #{params['callId']}", loggable_params if params["eventType"] == "disconnect" + call_attempt_repo.ending_call(c, params["callId"]) CDR.for_outbound(params).save.catch(&method(:log_error)) end "OK" @@ -317,6 +318,7 @@ class Web < Roda ).then do |ca| r.json { ca.to_json } + call_attempt_repo.starting_call(c, params["callId"]) render(*ca.to_render) end end -- 2.45.2