diff -uNr a/blatta/README.txt b/blatta/README.txt --- a/blatta/README.txt 406509e8f3c1ed2099f5c227646af37c29f8d14d0d4303e2fec2602d9bb8bd5a0ad90374173b9cdc49edaf59fd979dca5015449620b692d8715bc87f6ebe0c75 +++ b/blatta/README.txt a20c712e13dbe20bfc57019a6d530124742129436f8a683726d378b8ae9047aafaf0aa746ba826a29a64bcdfee6ab20dfc24c98a759bf4cbd1d83ed4b3d393a5 @@ -33,4 +33,13 @@ NOTES: To run the unit tests, you'll need to run: -pip install mock \ No newline at end of file +pip install mock + +To use experimental mcrypt support: +1. Install the mcrypt dev lib. On my dulap-style box this was at dev-libs/libmcrypt-2.5.8-r4 +On an Ubuntoid, this would be libmcrypt-dev + +2. Get python-mcrypt: https://labix.org/python-mcrypt +3. Run: + python setup.py build + python setup.py install \ No newline at end of file diff -uNr a/blatta/VERSION b/blatta/VERSION --- a/blatta/VERSION e939319655cbd7d710f012c0986cbe8b900a3a82ef28006dbdddb03efc993211886356770f832525ea2e0ef08d92cb1bb7d34588446c7c834b2c24a9198a22f7 +++ b/blatta/VERSION 3fc13965be21f7e838020dca5a18631653c724197d17c208a04fa83ccf31abd2a7e57d36de1c44b1385993ba72d60b4e9edf18c78936e0f101d9191af8386f6c @@ -1 +1 @@ -9972 +9971 diff -uNr a/blatta/lib/address_cast.py b/blatta/lib/address_cast.py --- a/blatta/lib/address_cast.py c38d152f4b5c292255d08c410e2b1ffb814082fd766eedb2be209bf0037e9b3b62e3666cbf01cdc3651b3be73598bcf4fc39b583fb2afee62ce519ee6f1d023e +++ b/blatta/lib/address_cast.py 5a7c5b92b27732f6376838f8ee8b0def493cb07f7d7fd5b3370024c899ff30dcd2ce756838db9ba89f5dd4bce35a73e40935a02991d2910f7ec13a91ccd57b99 @@ -7,12 +7,15 @@ import time from message_exception import MessageException, DUPLICATE_PACKET -from serpent import Serpent, serpent_cbc_decrypt, serpent_cbc_encrypt from message import Message, MESSAGE_PACKET_FORMAT from commands import ADDRESS_CAST from prod import get_ascii_address from prod import OUTGOING from prod import INCOMING +try: + import mcrypt +except: + from serpent import Serpent, serpent_cbc_decrypt, serpent_cbc_encrypt BLACK_PACKET_FORMAT = "<272s48s4s" RED_PACKET_FORMAT = "<16sL6s246s" @@ -43,7 +46,7 @@ if self.long_buffer.has(self.message_hash): raise MessageException(DUPLICATE_PACKET, self.metadata, self) - # if the signature checks out for a peer with no AT entry, decrypt + # if the signature checks out decrypt for peer in self.state.get_keyed_peers(exclude_addressless=False): if peer: ( @@ -63,21 +66,27 @@ continue # try to decrypt black packet - Serpent(cipher_key) - red_packet_bytes = serpent_cbc_decrypt(cipher_key, black_packet_bytes) + try: + serpent_cbc = mcrypt.MCRYPT('serpent', 'cbc') + serpent_cbc.init(cipher_key) + red_packet_bytes = serpent_cbc.decrypt(black_packet_bytes) + except: + logging.warn("Couldn't use mcrypt. Using pure python serpent algo instead.") + Serpent(cipher_key) + red_packet_bytes = serpent_cbc_decrypt(cipher_key, black_packet_bytes) try: (command, address) = struct.unpack(RED_PACKET_FORMAT, red_packet_bytes)[1:3] self.origin_peer = peer self.address = address - self.log_incoming(message.get('peer'), peer.handles[0], get_ascii_address(address)) + self.log_incoming('info', message.get('peer'), peer.handles[0], get_ascii_address(address)) return self except NameError as ne: # This message was not intended for us. # We must forward it on if the bounce count is below the cutoff pass - self.log_incoming(message.get('peer'), "N/A", "N/A") + self.log_incoming('debug', message.get('peer'), "N/A", "N/A") return self # Increment bounce count and send to everyone except originating peer @@ -111,7 +120,13 @@ os.urandom(RED_PACKET_PADDING_LENGTH) ) # form the black packet by encrypting and signing the red packet - black_packet_bytes = serpent_cbc_encrypt(cipher_key, red_packet_bytes) + try: + serpent_cbc = mcrypt.MCRYPT('serpent', 'cbc') + serpent_cbc.init(cipher_key) + black_packet_bytes = serpent_cbc.encrypt(red_packet_bytes) + except: + logging.warn("Couldn't use mcrypt. Using pure python serpent algo instead.") + black_packet_bytes = serpent_cbc_encrypt(cipher_key, red_packet_bytes) # sign and pack the black address cast packet seal = hmac.new(signing_key, black_packet_bytes, hashlib.sha384).digest() @@ -131,15 +146,19 @@ self.log_outgoing(red_packet_bytes, cold_peer, peer) - def log_incoming(self, peer, origin_peer_handle, address): - logging.info(LOGGING_FORMAT % ( + def log_incoming(self, level, peer, origin_peer_handle, address): + params = ( peer.address, peer.port, peer.handles[0], INCOMING, "ADDRESS_CAST", origin_peer_handle, - address)) + address) + if level == 'info': + logging.info(LOGGING_FORMAT % params) + else: + logging.debug(LOGGING_FORMAT % params) def log_outgoing(self, red_packet_bytes, cold_peer, peer): ( diff -uNr a/blatta/lib/ignore.py b/blatta/lib/ignore.py --- a/blatta/lib/ignore.py f0b5c7f1b1d3c36ecd28449802b8025b02e4123f4fcd07e189d0e83ec05c2453a8ff8110419abc58bff32a36172ec2d9267d2db7ba250bf745cfe15f8336d15f +++ b/blatta/lib/ignore.py 96a80e229898cf9d2fc83a80f7e00c13b4440562e170a2ae2733c905901fcf8f44a5657f5a638918e0b9f1094faa63b4629e4b776e70bd638672562cc77599f5 @@ -1,11 +1,13 @@ +import binascii import logging import time import hashlib import os from message import Message -from commands import IGNORE +from commands import IGNORE, COMMAND_LABELS from message import MAX_MESSAGE_LENGTH +OUTGOING_MESSAGE_LOGGING_FORMAT = u"[%s:%d %s] <- %s %s %s %s" class Ignore(Message): def __init__(self, message, state): @@ -50,5 +52,14 @@ for peer in self.state.get_keyed_peers(exclude_addressless=True): signed_packet_bytes = self.pack(peer, self.command, self.bounces, self.message_bytes) peer.send(signed_packet_bytes) - if os.environ.get('LOG_RUBBISH'): - self.log_rubbish(peer) \ No newline at end of file + self.log_rubbish(peer) + + def log_rubbish(self, peer): + logging.debug(OUTGOING_MESSAGE_LOGGING_FORMAT % (peer.address, + peer.port, + peer.handles[0], + COMMAND_LABELS[self.command], + "", + self.bounces, + binascii.hexlify(self.message_hash))) + diff -uNr a/blatta/lib/message.py b/blatta/lib/message.py --- a/blatta/lib/message.py 853561293347d88a3da3bafbd9dafb48c23eeda3e05d89587ff485a129565d9e30b231d50902bd16040a0de8e6ccf37e98b4be342c318ecec9883525629a6fc3 +++ b/blatta/lib/message.py 2412b895a22c1f003fb2503cb304c689c72f180641c84a463c7ecc4cdf86db2b824855fa318d0ed076d41d8ce6c260304624d862e9a4d394daa11034ff717b4b @@ -1,4 +1,3 @@ -from serpent import serpent_cbc_encrypt from commands import COMMAND_LABELS from message_exception import MessageException, INVALID_HANDLE_ENCODING import hashlib @@ -9,6 +8,10 @@ import hmac import os import logging +try: + import mcrypt +except: + from serpent import serpent_cbc_encrypt PEST_VERSION = 0xFC EARLIEST_SUPPORTED_PEST_VERSION = 0xFC @@ -69,7 +72,13 @@ command, cls._pad(message_bytes, MAX_MESSAGE_LENGTH)) - black_packet_bytes = serpent_cbc_encrypt(cipher_key, red_packet_bytes) + try: + serpent_cbc = mcrypt.MCRYPT('serpent', 'cbc') + serpent_cbc.init(cipher_key) + black_packet_bytes = serpent_cbc.encrypt(red_packet_bytes) + except Exception as ex: + logging.warn("Couldn't use mcrypt. Using pure python serpent algo instead.") + black_packet_bytes = serpent_cbc_encrypt(cipher_key, red_packet_bytes) # sign packet signature_bytes = hmac.new(signing_key, black_packet_bytes, hashlib.sha384).digest() @@ -153,15 +162,6 @@ self.bounces, binascii.hexlify(self.compute_message_hash()))) - def log_rubbish(self, peer): - logging.info(OUTGOING_MESSAGE_LOGGING_FORMAT % (peer.address, - peer.port, - peer.handles[0], - COMMAND_LABELS[self.command], - "", - self.bounces, - binascii.hexlify(self.message_hash))) - def log_incoming(self, peer): try: logging.info(INCOMING_MESSAGE_LOGGING_FORMAT % (peer.address, diff -uNr a/blatta/lib/message_factory.py b/blatta/lib/message_factory.py --- a/blatta/lib/message_factory.py 845a88cabc0a29e8faa9ddfd059dde6069e8d249c85b3c47b661599930d2f9cfaaa3727208cc006bee887fa717409c085ba7255d9d41fde3fe640ec18ef539f9 +++ b/blatta/lib/message_factory.py 8a3a514b88446d8ab20fe04c77568ef829d0d01f25ff19afb4906d9e42a89b1f35ec3800cdd8500adc41d1eca6a6428e2cb8aa13248bb591354ff019bd5f56e4 @@ -2,6 +2,8 @@ import hashlib import hmac import struct +import logging + from prod import Prod from address_cast import AddressCast @@ -9,11 +11,13 @@ from broadcast import Broadcast from ignore import Ignore from getdata import GetData -from serpent import Serpent -from serpent import serpent_cbc_decrypt from message_exception import MessageException from message import PEST_VERSION, EARLIEST_SUPPORTED_PEST_VERSION -import logging +try: + import mcrypt +except: + from serpent import Serpent + from serpent import serpent_cbc_decrypt #command codes from commands import BROADCAST, DIRECT, PROD, GETDATA, IGNORE, ADDRESS_CAST @@ -60,8 +64,14 @@ continue # try to decrypt black packet - Serpent(cipher_key) - red_packet_bytes = serpent_cbc_decrypt(cipher_key, black_packet_bytes) + try: + serpent_cbc = mcrypt.MCRYPT('serpent', 'cbc') + serpent_cbc.init(cipher_key) + red_packet_bytes = serpent_cbc.decrypt(black_packet_bytes) + except Exception as ex: + logging.warn("Couldn't use mcrypt. Using pure python serpent algo instead.") + Serpent(cipher_key) + red_packet_bytes = serpent_cbc_decrypt(cipher_key, black_packet_bytes) # unpack red packet try: diff -uNr a/blatta/lib/server.py b/blatta/lib/server.py --- a/blatta/lib/server.py b9b367ef99ff3627ea41e12893e777a7a2e000c3fcbcb917c2c66646a65d0c1713d4ebf08323a55770e44d42f9e07a00669f07a6c7da4da262eed5e751fc2151 +++ b/blatta/lib/server.py 2dabb109066e226fac190ce0651dd284126bca3b131d1c89ea5f97c3aa0affd6f649dae5b108ea20a92fd2efdf692569cb89fae171bcf03a332cf2c061f87bba @@ -5,12 +5,13 @@ import random from funcs import * -from client import Client +from client import Client from channel import Channel from message import PACKET_SIZE import pprint import logging + class Server(object): def __init__(self, options, station): self.station = station @@ -35,6 +36,15 @@ self.client = None self.nicknames = {} # irc_lower(Nickname) --> Client instance. + self.last_aliveness_check = time.time() + self.last_short_buffer_check = time.time() + self.last_rubbish_dispatch = time.time() + self.last_order_buffer_check = time.time() + self.last_presence_check = time.time() + self.last_prod = time.time() + self.last_address_cast = time.time() + self.initial_prod_sent = False + def get_client(self, nickname): return self.nicknames.get(irc_lower(nickname)) @@ -87,111 +97,120 @@ self.station.socket = self.udp_server_socket logging.info("Listening for Pest packets on udp port %d." % self.udp_port) - serversockets = [] - for port in self.irc_ports: - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - try: - s.bind((self.irc_server_address, port)) - except socket.error as e: - logging.error("Could not bind port %s: %s." % (port, e)) - sys.exit(1) - s.listen(5) - serversockets.append(s) - del s - logging.info("Listening for IRC connections on port %d." % port) - - # event loop setup - last_aliveness_check = time.time() - last_short_buffer_check = time.time() - last_rubbish_dispatch = time.time() - last_order_buffer_check = time.time() - last_presence_check = time.time() - last_prod = time.time() - last_address_cast = time.time() - initial_prod_sent = False + irc_server_socket = self.get_irc_server_socket() while True: # we don't want to be listening for client connections if there's already a client connected if self.client is None: - input_sockets = serversockets + irc_input_socket = irc_server_socket else: - input_sockets = [self.client.socket] + irc_input_socket = self.client.socket output_sockets = ([self.client.socket] - if self.client and self.client.write_queue_size() > 0 else []) + if self.client and self.client.write_queue_size() > 0 else []) - # handle tcp socket events - (iwtd, owtd, ewtd) = select.select(input_sockets, output_sockets, [], .2) + # handle tcp and udp socket events + (iwtd, owtd, ewtd) = select.select( + [irc_input_socket, self.udp_server_socket], + output_sockets, + [], + 0.5 #timeout + ) for x in iwtd: - if self.client is not None: + if x == self.udp_server_socket: + bytes_address_pair = self.udp_server_socket.recvfrom(PACKET_SIZE) + self.station.handle_udp_data(bytes_address_pair) + elif self.client is not None: self.client.socket_readable_notification() - else: - try: - (conn, addr) = x.accept() - self.client = Client(self, conn) - self.station.client = self.client - self.client.state = self.station.state - self.client.long_buffer = self.station.long_buffer - logging.info("Accepted connection from %s:%s." % ( - addr[0], addr[1])) - except socket.error as e: - logging.error("Failed to accept new client connection: %s" % e) + elif x == irc_server_socket: + self.connect_to_client(x) for x in owtd: if self.client and x == self.client.socket: # client may have been disconnected self.client.socket_writable_notification() - # handle udp socket events - (inputready,outputready,exceptready) = select.select([self.udp_server_socket],[],[],0) - for x in inputready: - if x == self.udp_server_socket: - bytes_address_pair = self.udp_server_socket.recvfrom(PACKET_SIZE) - self.station.handle_udp_data(bytes_address_pair) + self.trigger_events() + + def connect_to_client(self, sock): + try: + (conn, addr) = sock.accept() + self.client = Client(self, conn) + self.station.client = self.client + self.client.state = self.station.state + self.client.long_buffer = self.station.long_buffer + logging.info("Accepted connection from %s:%s." % ( + addr[0], addr[1])) + except socket.error as e: + logging.error("Failed to accept new client connection: %s" % e) + + def interpret_none_as_zero(self, knob): + value = self.station.state.get_knob(knob) + if value is None: + return 0 + else: + return float(value) - # ping pong - now = time.time() - if last_aliveness_check + 10 < now: - if self.client: - self.client.check_aliveness() - last_aliveness_check = now - - # clear embargo queue if enough time has elapsed - if last_short_buffer_check + float(self.station.state.get_knob('short_buffer_check_interval_seconds')) < now: - self.station.check_short_buffer() - last_short_buffer_check = now - - # spray rubbish - if last_rubbish_dispatch + float(self.station.state.get_knob('rubbish_interval_seconds')) < now: - self.station.send_rubbish() - last_rubbish_dispatch = now - - # check order buffer - if last_order_buffer_check + float(self.station.state.get_knob('order_buffer_check_seconds')) < now: - self.station.check_order_buffer() - last_order_buffer_check = now - - # check presence - if last_presence_check + float(self.station.state.get_knob('presence_check_seconds')) < now: - self.station.report_presence() - last_presence_check = now - - # broadcast address cast packets - if last_address_cast + 1 < now: - range_limit = self.station.state.get_knob('address_cast_interval_seconds') - if random.randint(1, range_limit) == range_limit: + def get_irc_server_socket(self): + for port in self.irc_ports: + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + try: + s.bind((self.irc_server_address, port)) + except socket.error as e: + logging.error("Could not bind port %s: %s." % (port, e)) + sys.exit(1) + s.listen(5) + logging.info("Listening for IRC connections on port %d." % port) + return s + + def trigger_events(self): + # ping pong + now = time.time() + if self.last_aliveness_check + 10 < now: + if self.client: + self.client.check_aliveness() + self.last_aliveness_check = now + + # clear embargo queue if enough time has elapsed + if self.last_short_buffer_check + float( + self.station.state.get_knob('short_buffer_check_interval_seconds')) < now: + self.station.check_short_buffer() + self.last_short_buffer_check = now + + # spray rubbish + if self.last_rubbish_dispatch + float(self.station.state.get_knob('rubbish_interval_seconds')) < now: + self.station.send_rubbish() + self.last_rubbish_dispatch = now + + # check order buffer + if self.last_order_buffer_check + float(self.station.state.get_knob('order_buffer_check_seconds')) < now: + self.station.check_order_buffer() + self.last_order_buffer_check = now + + # check presence + if self.last_presence_check + float(self.station.state.get_knob('presence_check_seconds')) < now: + self.station.report_presence() + self.last_presence_check = now + + # broadcast address cast packets + address_cast_interval_seconds = self.interpret_none_as_zero('address_cast_interval_seconds') + if address_cast_interval_seconds != 0: + if self.last_address_cast + 1 < now: + range_limit = address_cast_interval_seconds + val = random.randint(1, range_limit) + if val == range_limit: self.station.send_address_cast() - last_address_cast = now + self.last_address_cast = now - # send prod - if self.station.state.get_knob('prod_interval_seconds') == None: - if not initial_prod_sent: - self.station.send_prod() - initial_prod_sent = True - else: - if last_prod + float(self.station.state.get_knob('prod_interval_seconds')) < now: - self.station.send_prod() - last_prod = now + # send prod + prod_interval_seconds = self.interpret_none_as_zero('prod_interval_seconds') + if prod_interval_seconds == 0: + if not self.initial_prod_sent: + self.station.send_prod() + self.initial_prod_sent = True + else: + if self.last_prod + prod_interval_seconds < now: + self.station.send_prod() + self.last_prod = now def create_directory(path): if not os.path.isdir(path): os.makedirs(path) - diff -uNr a/blatta/lib/state.py b/blatta/lib/state.py --- a/blatta/lib/state.py 454c23bdcd92e53c4b7ba9b14e84f4f66958b70723a10a6079a151b8c5d404b8c0f748b10da5a7297496205ccbd5e87875009c0d34fcc9674095459694374bd8 +++ b/blatta/lib/state.py 1e468969a32877911c9f59fbfa8fab5dca982b73725e0f83d7c6b2373bcdfe613df83264bdaa5dc3644117db5fa1d8652d17abeaa33dba89ea1857e3ae9a2d6b @@ -26,11 +26,11 @@ 'short_buffer_expiration_seconds': 1, 'short_buffer_check_interval_seconds': 1, 'getdata_requests_expiration_seconds': 10000, - 'peer_offline_interval_seconds': 60, + 'peer_offline_interval_seconds': 120, 'peer_away_interval_seconds': 10 * 60, 'presence_check_seconds': 5, 'prod_interval_seconds': 0, - 'address_cast_interval_seconds': 65, + 'address_cast_interval_seconds': 0, 'cold_peer_seconds': 62, 'banner': "Blatta %s" % (VERSION), } @@ -373,7 +373,7 @@ def get_keyed_peers(self, exclude_addressless=False, exclude_ids=[], no_at_only=False): cursor = self.cursor() - peer_ids = self.listify(cursor.execute("select peer_id from keys\ + peer_ids = self.listify(cursor.execute("select distinct peer_id from keys\ where peer_id not in (%s) order by random()" % ','.join( '?' * len(exclude_ids)), exclude_ids).fetchall()) @@ -383,7 +383,7 @@ handle = cursor.execute("select handle from handles where peer_id=?", (peer_id,)).fetchone()[0] peer = self.get_peer_by_handle(handle) if self.is_duplicate(peers, peer): - logging.error("Peer with duplicate address or handle id detected: %s" % peer.handles[0]) + logging.warn("Peer with duplicate address or handle id detected: %s" % peer.handles[0]) if exclude_addressless and (peer.address is None or peer.port is None): continue if no_at_only and (peer.address is not None or peer.port is not None): diff -uNr a/blatta/lib/station.py b/blatta/lib/station.py --- a/blatta/lib/station.py f6af75aae5a5533728cde6a1fe9c3cc3d589dbe5bc2f2db41b16cb908f6abeabcc77fee321133e8d3f4d0549f9105250afbf73d6c34b1c182e58277619c77694 +++ b/blatta/lib/station.py 5cecc3fef53aaf91a206b782d8f5257df0a63177d7ef1abe17b0f04bea1ed84f7abab15681c51993d80e2524b0e9d0c582681c9051e67bcd627acf347550d50b @@ -173,9 +173,7 @@ address = packet_info[0] port = packet_info[1] packet_sample = packet_info[2] - if os.environ.get('LOG_RUBBISH'): - logging.debug("[%s:%d] -> ignoring packet: %s" % (address, port, packet_sample)) - return + logging.debug("[%s:%d] -> ignoring packet: %s" % (address, port, packet_sample)) def handle_prod(self, prod): self.conditionally_update_at(prod, prod.metadata.get('address'))