Merge pull request 'Reformatting' (#2) from Yuuki/EliteBot:master into master

Reviewed-on: Raiza.dev/EliteBot#2
*pulls yuuki*
This commit is contained in:
Colby 2024-02-18 15:26:21 +01:00
commit f86a5aa62a
5 changed files with 42 additions and 31 deletions

View file

@ -1,9 +1,10 @@
from src.plugin_base import PluginBase
from src.channel_manager import ChannelManager
import sys import sys
class Plugin(PluginBase): from src.channel_manager import ChannelManager
from src.plugin_base import PluginBase
class Plugin(PluginBase):
def handle_message(self, source_nick, channel, message): def handle_message(self, source_nick, channel, message):
message_parts = message.split() message_parts = message.split()
self.channel_manager = ChannelManager() self.channel_manager = ChannelManager()
@ -26,7 +27,6 @@ class Plugin(PluginBase):
self.bot.ircsend(f'PART {message_parts[1]}') self.bot.ircsend(f'PART {message_parts[1]}')
self.channel_manager.remove_channel(message_parts[1]) self.channel_manager.remove_channel(message_parts[1])
elif message_parts[0] == "!quit": elif message_parts[0] == "!quit":
if len(message_parts) == 0: if len(message_parts) == 0:
quit_message = 'EliteBot!' quit_message = 'EliteBot!'

View file

@ -1 +1,2 @@
colorama==0.4.6 colorama==0.4.6
PyYAML==6.0.1

View file

@ -1,19 +1,22 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import importlib.util
import inspect
import json
import os
import socket import socket
import ssl import ssl
import time
import json
import yaml
import inspect
import sys import sys
import os import time
import importlib.util
import yaml
from src.channel_manager import ChannelManager from src.channel_manager import ChannelManager
from src.logger import Logger from src.logger import Logger
from src.plugin_base import PluginBase from src.plugin_base import PluginBase
from src.sasl import handle_sasl, handle_authenticate, handle_903 from src.sasl import handle_sasl, handle_authenticate, handle_903
class Bot: class Bot:
def __init__(self, config_file): def __init__(self, config_file):
self.config = self.load_config(config_file) self.config = self.load_config(config_file)
@ -51,11 +54,11 @@ class Bot:
def load_plugins(self): def load_plugins(self):
self.plugins = [] self.plugins = []
plugin_folder = "./plugins" plugin_folder = './plugins'
for filename in os.listdir(plugin_folder): for filename in os.listdir(plugin_folder):
if filename.endswith('.py'): if filename.endswith('.py'):
filepath = os.path.join(plugin_folder, filename) filepath = os.path.join(plugin_folder, filename)
spec = importlib.util.spec_from_file_location("module.name", filepath) spec = importlib.util.spec_from_file_location('module.name', filepath)
module = importlib.util.module_from_spec(spec) module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module) spec.loader.exec_module(module)
for name, obj in inspect.getmembers(module): for name, obj in inspect.getmembers(module):
@ -122,9 +125,10 @@ class Bot:
try: try:
self.ircsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.ircsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if str(self.config["Connection"].get("Port"))[:1] == '+': if str(self.config['Connection'].get('Port'))[:1] == '+':
context = ssl.create_default_context() context = ssl.create_default_context()
self.ircsock = context.wrap_socket(self.ircsock, server_hostname=self.config["Connection"].get("Hostname")) self.ircsock = context.wrap_socket(self.ircsock,
server_hostname=self.config['Connection'].get('Hostname'))
port = int(self.config['Connection'].get('Port')[1:]) port = int(self.config['Connection'].get('Port')[1:])
else: else:
port = int(self.config['Connection'].get('Port')) port = int(self.config['Connection'].get('Port'))
@ -135,7 +139,7 @@ class Bot:
self.ircsock.connect_ex((self.config['Connection'].get('Hostname'), port)) self.ircsock.connect_ex((self.config['Connection'].get('Hostname'), port))
self.ircsend(f'NICK {self.config["Connection"].get("Nick")}') self.ircsend(f'NICK {self.config["Connection"].get("Nick")}')
self.ircsend(f'USER {self.config["Connection"].get("Ident")} * * :{self.config["Connection"].get("Name")}') self.ircsend(f'USER {self.config["Connection"].get("Ident")} * * :{self.config["Connection"].get("Name")}')
if self.config["SASL"].get("UseSASL"): if self.config['SASL'].get('UseSASL'):
self.ircsend('CAP REQ :sasl') self.ircsend('CAP REQ :sasl')
except Exception as e: except Exception as e:
self.logger.error(f"Error establishing connection: {e}") self.logger.error(f"Error establishing connection: {e}")

View file

@ -4,12 +4,13 @@ import json
import os import os
from os import path from os import path
class ChannelManager: class ChannelManager:
def __init__(self): def __init__(self):
self.channels = self._load_channels() self.channels = self._load_channels()
def _load_channels(self): def _load_channels(self):
os.makedirs("data", exist_ok=True) os.makedirs('data', exist_ok=True)
if not path.exists('data/channels.json'): if not path.exists('data/channels.json'):
with open('data/channels.json', 'w') as f: with open('data/channels.json', 'w') as f:
json.dump([], f) json.dump([], f)
@ -18,10 +19,10 @@ class ChannelManager:
with open('data/channels.json', 'r') as f: with open('data/channels.json', 'r') as f:
return json.load(f) return json.load(f)
except json.JSONDecodeError as e: except json.JSONDecodeError as e:
print(f"Error decoding JSON: {e}") print(f'Error decoding JSON: {e}')
return [] return []
except Exception as e: except Exception as e:
print(f"Error loading channels: {e}") print(f'Error loading channels: {e}')
return [] return []
def save_channel(self, channel): def save_channel(self, channel):
@ -37,12 +38,12 @@ class ChannelManager:
self._write_channels() self._write_channels()
def _write_channels(self): def _write_channels(self):
os.makedirs("data", exist_ok=True) os.makedirs('data', exist_ok=True)
try: try:
with open('data/channels.json', 'w') as f: with open('data/channels.json', 'w') as f:
json.dump(self.channels, f) json.dump(self.channels, f)
except Exception as e: except Exception as e:
print(f"Error saving channels: {e}") print(f'Error saving channels: {e}')
def get_channels(self): def get_channels(self):
return self.channels return self.channels

View file

@ -4,6 +4,7 @@ import base64
NULL_BYTE = '\x00' NULL_BYTE = '\x00'
ENCODING = 'UTF-8' ENCODING = 'UTF-8'
def handle_sasl(config, ircsend): def handle_sasl(config, ircsend):
""" """
Handles SASL authentication by sending an AUTHENTICATE command. Handles SASL authentication by sending an AUTHENTICATE command.
@ -14,6 +15,7 @@ def handle_sasl(config, ircsend):
""" """
ircsend('AUTHENTICATE PLAIN') ircsend('AUTHENTICATE PLAIN')
def handle_authenticate(args, config, ircsend): def handle_authenticate(args, config, ircsend):
""" """
Handles the AUTHENTICATE command response. Handles the AUTHENTICATE command response.
@ -24,12 +26,15 @@ def handle_authenticate(args, config, ircsend):
ircsend (function): Function to send IRC commands ircsend (function): Function to send IRC commands
""" """
if args[0] == '+': if args[0] == '+':
if "SASLNick" in config['SASL'] and "SASLPassword" in config['SASL']: if 'SASLNick' in config['SASL'] and 'SASLPassword' in config['SASL']:
authpass = f"{config['SASL']['SASLNick']}{NULL_BYTE}{config['SASL']['SASLNick']}{NULL_BYTE}{config['SASL']['SASLPassword']}" authpass = (f'{config["SASL"]["SASLNick"]}{NULL_BYTE}'
f'{config["SASL"]["SASLNick"]}{NULL_BYTE}'
f'{config["SASL"]["SASLPassword"]}')
ap_encoded = base64.b64encode(authpass.encode(ENCODING)).decode(ENCODING) ap_encoded = base64.b64encode(authpass.encode(ENCODING)).decode(ENCODING)
ircsend(f'AUTHENTICATE {ap_encoded}') ircsend(f'AUTHENTICATE {ap_encoded}')
else: else:
raise KeyError("SASLNICK and/or SASLPASS not found in config") raise KeyError('SASLNICK and/or SASLPASS not found in config')
def handle_903(ircsend): def handle_903(ircsend):
""" """