W.I.P. Database support. #4

Merged
ComputerTech merged 2 commits from :master into master 2024-02-19 10:28:47 +01:00
8 changed files with 126 additions and 14 deletions

View file

@ -14,5 +14,8 @@
}, },
"Logging": { "Logging": {
"Console": true "Console": true
},
"Database": {
"ConnectionString": "mysql://user:password@host:port/database"
} }
} }

View file

@ -11,3 +11,5 @@ SASL:
SASLPassword: password SASLPassword: password
Logging: Logging:
Console: true Console: true
Database:
ConnectionString: mysql://user:password@host:port/database

View file

@ -1,34 +1,37 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import sys import sys
from src.bot import Bot from src.bot import Bot
def main(): def main():
if len(sys.argv) < 2: if len(sys.argv) < 2:
print("Usage: python elitebot.py <config_file>") print('Usage: python elitebot.py <config_file>')
sys.exit(1) sys.exit(1)
config_file = sys.argv[1] config_file = sys.argv[1]
try: try:
bot = Bot(config_file) bot = Bot(config_file)
except FileNotFoundError as e: except FileNotFoundError as e:
print(f"Config file not found: {e}") print(f'Config file not found: {e}')
sys.exit(1) sys.exit(1)
except Exception as e: except Exception as e:
print(f"Error loading bot: {e}") print(f'Error loading bot: {e}')
sys.exit(1) sys.exit(1)
try: try:
print("EliteBot started successfully!") print('EliteBot started successfully!')
bot.start() bot.start()
except Exception as e: except Exception as e:
print(f"Error starting EliteBot: {e}") print(f'Error starting EliteBot: {e}')
sys.exit(1) sys.exit(1)
if __name__ == "__main__":
if __name__ == '__main__':
try: try:
main() main()
except KeyboardInterrupt: except KeyboardInterrupt:
print("\nEliteBot has been stopped.") print('\nEliteBot has been stopped.')
except Exception as e: except Exception as e:
print(f"An unexpected error occurred: {e}") print(f'An unexpected error occurred: {e}')

View file

@ -27,7 +27,7 @@ class Plugin(PluginBase):
self.bot.ircsend(f'PART {message_parts[1]}') self.bot.ircsend(f'PART {message_parts[1]}')
self.channel_manager.remove_channel(message_parts[1]) self.channel_manager.remove_channel(message_parts[1])
elif message_parts[0] == "!quit": elif message_parts[0] == '!quit':
if len(message_parts) == 0: if len(message_parts) == 0:
quit_message = 'EliteBot!' quit_message = 'EliteBot!'
else: else:
@ -37,9 +37,9 @@ class Plugin(PluginBase):
self.bot.connected = False self.bot.connected = False
sys.exit() sys.exit()
elif message_parts[0] == "!raw": elif message_parts[0] == '!raw':
if len(message_parts) > 1: if len(message_parts) > 1:
if message_parts[1].upper() == "PRIVMSG" and len(message_parts) > 3: if message_parts[1].upper() == 'PRIVMSG' and len(message_parts) > 3:
raw_command = ' '.join(message_parts[1:3]) + " :" + ' '.join(message_parts[3:]) raw_command = ' '.join(message_parts[1:3]) + " :" + ' '.join(message_parts[3:])
else: else:
raw_command = ' '.join(message_parts[1:]) raw_command = ' '.join(message_parts[1:])

58
plugins/cookie.py Normal file
View file

@ -0,0 +1,58 @@
from sqlalchemy import Table, Column, Integer, String, MetaData, insert, select
from src.channel_manager import ChannelManager
from src.db import Database
from src.plugin_base import PluginBase
meta = MetaData()
cookie_table = Table(
'Cookie',
meta,
Column('id', Integer, primary_key=True, autoincrement=True),
Column('name', String, unique=True, nullable=False),
Column('cookies', Integer, default=0),
Column('last', String, default='1999/01/01 00:00:00'),
)
c_db = Database(cookie_table, meta)
class Plugin(PluginBase):
def handle_message(self, source_nick, channel, message):
parts = message.split()
c_db.create_table('Cookie')
self.channel_manager = ChannelManager()
if parts[0].lower() == '!cookie':
if len(parts) == 1: # !cookie
self.insert_user(source_nick)
self.bot.ircsend(f'PRIVMSG {channel} :Nooooo~~')
c_db.set(source_nick, {'cookies': 1, 'last': '1999/01/01 00:00:01'})
elif len(parts) == 2: # !cookie USER
cookies = c_db.get(parts[1], 2)
if cookies == -1:
self.bot.ircsend(f'PRIVMSG {channel} :I\'ve looked everywhere for {parts[1]}, but I couldn\'t '
f'find them.')
else:
c = 'cookie'
if cookies == 0:
c = 'no cookies.'
elif cookies == 1:
c = f'{cookies} cookie.'
else:
c = f'{cookies} cookies.'
self.bot.ircsend(f'PRIVMSG {channel} :{parts[1]} currently has {c}')
def insert_user(self, user: str):
with c_db.engine.connect() as conn:
stmt = select(cookie_table).where(cookie_table.c.name == user)
cnt = len(conn.execute(stmt).fetchall())
if cnt == 0:
conn.execute((
insert(cookie_table).
values({'name': user})
))
conn.commit()

View file

@ -1,2 +1,7 @@
colorama==0.4.6 colorama==0.4.6
greenlet==3.0.3
mysql-connector==2.2.9
PyYAML==6.0.1 PyYAML==6.0.1
SQLAlchemy==2.0.27
SQLAlchemy-Utils==0.41.1
typing_extensions==4.9.0

View file

@ -21,6 +21,7 @@ class Bot:
def __init__(self, config_file): def __init__(self, config_file):
self.config = self.load_config(config_file) self.config = self.load_config(config_file)
self.validate_config(self.config) self.validate_config(self.config)
self.connection_string = self.config['Database'].get('ConnectionString')
self.channel_manager = ChannelManager() self.channel_manager = ChannelManager()
self.logger = Logger('logs/elitebot.log') self.logger = Logger('logs/elitebot.log')
self.connected = False self.connected = False
@ -36,7 +37,8 @@ class Bot:
["Connection", "Nick"], ["Connection", "Nick"],
["Connection", "Ident"], ["Connection", "Ident"],
["Connection", "Name"], ["Connection", "Name"],
["SASL", "UseSASL"] # ["SASL", "UseSASL"],
["Database", "ConnectionString"]
] ]
for field in required_fields: for field in required_fields:

39
src/db.py Normal file
View file

@ -0,0 +1,39 @@
from sqlalchemy import create_engine, Table, MetaData, inspect, update, select
from sqlalchemy_utils import database_exists, create_database
# TODO: Load ConnectionString from config
class Database:
def __init__(self, table: Table, meta: MetaData):
self.engine = create_engine(r'sqlite:///.\data\database.db')
self.table = table
self.meta = meta
if not database_exists(self.engine.url):
create_database(self.engine.url)
def create_table(self, table_name):
if not inspect(self.engine).has_table(table_name):
self.meta.create_all(self.engine)
def set(self, user: str, values: dict):
with self.engine.connect() as conn:
stmt = select(self.table).where(self.table.c.name == user)
cnt = len(conn.execute(stmt).fetchall())
if cnt == 1:
conn.execute((
update(self.table).
values(values)
))
conn.commit()
def get(self, user: str, index: int):
with self.engine.connect() as conn:
stmt = select(self.table).where(self.table.c.name == user)
cnt = len(conn.execute(stmt).fetchall())
if cnt == 1:
return conn.execute(select(self.table).where(self.table.c.name == user)).fetchone()[index]
else:
return -1