AI and VoIP Blog

VOIP | AI | Cloud | Kamailio | Open Source


Build a SIP Registrar using Python with KEMI framework for Kamailio


Kamailio is a free, open-source SIP server that can handle thousands of call setups per second. It is designed to scale to any size of VoIP network, including enterprise networks and carrier-grade systems. Kamailio is written in C language and provides an embedded interpreter that allows users to add their custom logic in different scripting languages. This framework is called Kamailio Embedded Interpreter Interface (KEMI) framework.

In my previous post Getting started with Kamailio Embedded Interpreter Interface (KEMI) framework using Python, we discussed the basics of KEMI. In this article, we are going to discuss how Kamailio can be used as a SIP registrar using the KEMI framework with Python.

Creating the code:
The code of the function, ksr_request_route(self, msg) is given below. This function is equivalent to the request_route{} block in Kamailio’s native script language. This function is called for every SIP request that Kamailio receives, and it handles the processing of the request. We need to call different methods to process register requests inside this function. Let’s go through the code of each of these methods individually.

    # SIP request routing -- equivalent of request_route{}
    def ksr_request_route(self, msg):
        # KSR.info("===== request - from kamailio python script\n")
        # KSR.info("===== method [%s] r-uri [%s]\n" % (KSR.pv.get("$rm"),KSR.pv.get("$ru")))
        # per request initial checks
        if self.ksr_route_reqinit(msg)==-255 :
            return 1
        # NAT detection
        if self.ksr_route_natdetect(msg)==-255 :
            return 1
        # -- only initial requests (no To tag)
        # handle retransmissions
        if KSR.tmx.t_precheck_trans()>0 :
            KSR.tm.t_check_trans()
            return 1
        if KSR.tm.t_check_trans()==0 :
            return 1
        # authentication
        if self.ksr_route_auth(msg)==-255 :
            return 1
        # handle registrations
        if self.ksr_route_registrar(msg)==-255 :
            return 1
        if KSR.corex.has_ruri_user() < 0 :
            # request with no Username in RURI
            KSR.sl.sl_send_reply(484, "Address Incomplete")
            return 1

        # user location service
        self.ksr_route_location(msg)

        return 1

The first function ksr_route_reqinit(self, msg) performs various initial checks on each SIP request (msg) received by the SIP proxy and returns an error code -255 if any of the checks fail. These checks include verifying the source IP address, checking if the IP address has been banned or if there is a traffic spike from it, checking the User-Agent field, checking the Max-Forwards header field, and performing a sanity check on the SIP message.

    # Per SIP request initial checks
    def ksr_route_reqinit(self, msg):
        if not KSR.is_myself(KSR.pv.get("$si")) :
            if not KSR.pv.is_null("$sht(ipban=>$si)") :
                # ip is already blocked
                KSR.dbg("request from blocked IP - " + KSR.pv.get("$rm")
                        + " from " + KSR.pv.get("$fu") + " (IP:"
                        + KSR.pv.get("$si") + ":" + str(KSR.pv.get("$sp")) + ")\n")
                return -255
            if KSR.pike.pike_check_req()<0 :
                KSR.err("ALERT: pike blocking " + KSR.pv.get("$rm")
                        + " from " + KSR.pv.get("$fu") + " (IP:"
                        + KSR.pv.get("$si") + ":" + str(KSR.pv.get("$sp")) + ")\n")
                KSR.pv.seti("$sht(ipban=>$si)", 1)
                return -255
        if KSR.corex.has_user_agent() > 0 :
            ua = KSR.pv.gete("$ua")
            if (ua.find("friendly")!=-1 or ua.find("scanner")!=-1
                    or ua.find("sipcli")!=-1 or ua.find("sipvicious")!=-1
                    or ua.find("VaxSIPUserAgent")!=-1 or ua.find("pplsip")!=-1) :
                KSR.sl.sl_send_reply(200, "Processed")
                return -255
        if KSR.maxfwd.process_maxfwd(10) < 0 :
            KSR.sl.sl_send_reply(483, "Too Many Hops")
            return -255
        if (KSR.is_OPTIONS()
                and KSR.is_myself_ruri()
                and KSR.corex.has_ruri_user() < 0) :
            KSR.sl.sl_send_reply(200, "Keepalive")
            return -255
        if KSR.sanity.sanity_check(17895, 7)<0 :
            KSR.err("Malformed SIP message from "
                    + KSR.pv.get("$si") + ":" + str(KSR.pv.get("$sp")) +"\n")
            return -255

The next function ksr_route_natdetect(self, msg) detects if a SIP message is coming from a client behind a NAT device, and if so, it fixes any NAT-related issues in the message and sets the appropriate flags in the SIP routing engine.

    # Caller NAT detection
    def ksr_route_natdetect(self, msg):
        KSR.force_rport()
        if KSR.nathelper.nat_uac_test(19)>0 :
            if KSR.is_REGISTER() :
                KSR.nathelper.fix_nated_register()
            elif KSR.siputils.is_first_hop()>0 :
                KSR.nathelper.set_contact_alias()
            KSR.setflag(FLT_NATS)
        return 1

The third function checks if the message received is a Register. If not a registration message, it is ignored, but if it is, the code checks if NAT traversal techniques are required. If so, it sets flags indicating that SIP NAT pinging should be performed. Finally, the code saves the user’s location and returns with a status of -255 indicating that the message should not be further processed.

    # Handle SIP registrations
    def ksr_route_registrar(self, msg):
        if not KSR.is_REGISTER() :
            return 1
        if KSR.isflagset(FLT_NATS) :
            KSR.setbflag(FLB_NATB)
            # do SIP NAT pinging
            KSR.setbflag(FLB_NATSIPPING)
        if KSR.registrar.save("location", 0)<0 :
            KSR.sl.sl_reply_error()
        return -255

The fourth function ksr_route_auth(self, msg) checks for IP authorization and user authentication. It allows the message to proceed if the source IP is allowed or if the user’s credentials are valid. Otherwise, it sends an authentication challenge or a 403 Forbidden response, respectively. It also checks if the message is directed to a local destination or not.

    # IP authorization and user authentication
    def ksr_route_auth(self, msg):
        if not KSR.is_REGISTER() :
            if KSR.permissions.allow_source_address(1)>0 :
                # source IP allowed
                return 1
        if KSR.is_REGISTER() or KSR.is_myself_furi() :
            # authenticate requests
            if KSR.auth_db.auth_check(KSR.pv.get("$fd"), "subscriber", 1)<0 :
                KSR.auth.auth_challenge(KSR.pv.get("$fd"), 0)
                return -255
            # user authenticated - remove auth header
            if not KSR.is_method_in("RP") :
                KSR.auth.consume_credentials()
        # if caller is not local subscriber, then check if it calls
        # a local destination, otherwise deny, not an open relay here
        if (not KSR.is_myself_furi()) and (not KSR.is_myself_ruri()) :
            KSR.sl.sl_send_reply(403, "Not relaying")
            return -255
        return 1

The last function is used to route a request to any of the registered users. First it looks up the user’s location using the registrar’s lookup function. If the lookup fails, a reply is sent back to the sender indicating that the user was not found or the method is not allowed. Finally, the message is relayed statefully using the t_relay function. If the relay fails, a reply error is sent. The function always returns -255.

    # User location service
    def ksr_route_location(self, msg):
        rc = KSR.registrar.lookup("location")
        if rc<0 :
            KSR.tm.t_newtran()
            if rc==-1 or rc==-3 :
                KSR.sl.send_reply(404, "Not Found")
                return -255
            elif rc==-2 :
                KSR.sl.send_reply(405, "Method Not Allowed")
                return -255

        # Relay the message statefully to the specified host and port
        if KSR.tm.t_relay()<0 :
            KSR.sl.sl_reply_error()
        return -255

Reference: Kamailio Github official repository example

 Thank you for visiting my blog. If you wants to stay up-to-date with my latest blog posts on VoIP technologies, Open Source, Cloud, and Artificial Intelligence, I highly recommend subscribing to my blog.

About Me


Discover more from AI and VoIP Blog

Subscribe to get the latest posts sent to your email.



One response to “Build a SIP Registrar using Python with KEMI framework for Kamailio”

  1. […] Update:- Here is the next article.Build a SIP Registrar using Python with KEMI framework for Kamailio […]

    Like

Leave a reply to Getting started with Kamailio Embedded Interpreter Interface (KEMI) framework using Python – Blog for VoIP Community Cancel reply