mirror of
https://github.com/nexy7574/LCC-bot.git
synced 2024-09-19 10:03:40 +01:00
add natural language processing to jimmy
This commit is contained in:
parent
f58a17fedd
commit
a137fd5519
7 changed files with 232 additions and 48 deletions
|
@ -6,8 +6,16 @@ from typing import Optional
|
|||
import discord
|
||||
from discord.ext import commands, tasks
|
||||
import config
|
||||
from utils import Assignments, Tutors, simple_embed_paginator, get_or_none, Student, hyperlink, console, \
|
||||
SelectAssigneesView
|
||||
from utils import (
|
||||
Assignments,
|
||||
Tutors,
|
||||
simple_embed_paginator,
|
||||
get_or_none,
|
||||
Student,
|
||||
hyperlink,
|
||||
console,
|
||||
SelectAssigneesView,
|
||||
)
|
||||
|
||||
BOOL_EMOJI = {True: "\N{white heavy check mark}", False: "\N{cross mark}"}
|
||||
|
||||
|
@ -114,7 +122,7 @@ class AssignmentsCog(commands.Cog):
|
|||
await assignment.update(reminders=assignment.reminders + [reminder_name])
|
||||
else:
|
||||
cur_text = msg_format.format(
|
||||
mentions=", ".join(map(self.resolve_user, assignment.assignees)) or '@everyone',
|
||||
mentions=", ".join(map(self.resolve_user, assignment.assignees)) or "@everyone",
|
||||
reminder_name=reminder_name,
|
||||
project_title=textwrap.shorten(assignment.title, 100, placeholder="..."),
|
||||
project_tutor=assignment.tutor.name.title(),
|
||||
|
@ -172,10 +180,7 @@ class AssignmentsCog(commands.Cog):
|
|||
f"(finished: {BOOL_EMOJI[assignment.finished]} | Submitted: {BOOL_EMOJI[assignment.submitted]})",
|
||||
inline=False,
|
||||
)
|
||||
embed.add_field(
|
||||
name="Assignees",
|
||||
value=", ".join(map(self.resolve_user, assignment.assignees)) or '*everyone*'
|
||||
)
|
||||
embed.add_field(name="Assignees", value=", ".join(map(self.resolve_user, assignment.assignees)) or "*everyone*")
|
||||
if assignment.reminders:
|
||||
embed.set_footer(text="Reminders sent: " + ", ".join(assignment.reminders))
|
||||
return embed
|
||||
|
@ -240,7 +245,7 @@ class AssignmentsCog(commands.Cog):
|
|||
"shared_doc": None,
|
||||
"due_by": None,
|
||||
"tutor": None,
|
||||
"assignees": []
|
||||
"assignees": [],
|
||||
}
|
||||
super().__init__(
|
||||
discord.ui.InputText(
|
||||
|
@ -329,8 +334,8 @@ class AssignmentsCog(commands.Cog):
|
|||
assigner = SelectAssigneesView()
|
||||
await msg.edit(
|
||||
content="Please select people who've been assigned to this task (leave blank or skip to assign"
|
||||
" everyone)",
|
||||
view=assigner
|
||||
" everyone)",
|
||||
view=assigner,
|
||||
)
|
||||
await assigner.wait()
|
||||
self.create_kwargs["assignees"] = [x.id for x in assigner.users]
|
||||
|
@ -611,9 +616,7 @@ class AssignmentsCog(commands.Cog):
|
|||
async def view_details(self, _, interaction: discord.Interaction):
|
||||
await interaction.response.defer(ephemeral=True)
|
||||
await assignment.created_by.load()
|
||||
await interaction.followup.send(
|
||||
embed=cog.generate_assignment_embed(assignment), ephemeral=True
|
||||
)
|
||||
await interaction.followup.send(embed=cog.generate_assignment_embed(assignment), ephemeral=True)
|
||||
await self.update_display(interaction)
|
||||
|
||||
await ctx.respond(view=EditAssignmentView())
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
from typing import Optional
|
||||
from typing import Optional, Tuple
|
||||
from datetime import datetime, time
|
||||
|
||||
import discord
|
||||
|
@ -13,9 +13,24 @@ RTL = "\N{leftwards black arrow}\U0000fe0f"
|
|||
|
||||
class Events(commands.Cog):
|
||||
def __init__(self, bot):
|
||||
self.bot: commands.Bot = bot
|
||||
self.bot = bot
|
||||
self.lupupa_warning_task.start()
|
||||
|
||||
# noinspection DuplicatedCode
|
||||
async def analyse_text(self, text: str) -> Optional[Tuple[float, float, float, float]]:
|
||||
"""Analyse text for positivity, negativity and neutrality."""
|
||||
|
||||
def inner():
|
||||
try:
|
||||
from utils.sentiment_analysis import intensity_analyser
|
||||
except ImportError:
|
||||
return None
|
||||
scores = intensity_analyser.polarity_scores(text)
|
||||
return scores["pos"], scores["neu"], scores["neg"], scores["compound"]
|
||||
|
||||
async with self.bot.training_lock:
|
||||
return await self.bot.loop.run_in_executor(None, inner)
|
||||
|
||||
def cog_unload(self):
|
||||
self.lupupa_warning_task.stop()
|
||||
|
||||
|
@ -99,8 +114,25 @@ class Events(commands.Cog):
|
|||
if message.author.bot is True:
|
||||
return
|
||||
if self.bot.user in message.mentions:
|
||||
if message.content.startswith(self.bot.user.mention) and message.content.lower().endswith("good bot"):
|
||||
return await message.reply("Thank you! :D")
|
||||
if message.content.startswith(self.bot.user.mention):
|
||||
if message.content.lower().endswith("bot"):
|
||||
pos, neut, neg, _ = await self.analyse_text(message.content)
|
||||
if pos > neg:
|
||||
embed = discord.Embed(description=":D", color=discord.Color.green())
|
||||
embed.set_footer(
|
||||
text=f"Pos: {pos*100:.2f}% | Neutral: {neut*100:.2f}% | Neg: {neg*100:.2f}%"
|
||||
)
|
||||
elif pos == neg:
|
||||
embed = discord.Embed(description=":|", color=discord.Color.greyple())
|
||||
embed.set_footer(
|
||||
text=f"Pos: {pos * 100:.2f}% | Neutral: {neut * 100:.2f}% | Neg: {neg * 100:.2f}%"
|
||||
)
|
||||
else:
|
||||
embed = discord.Embed(description=":(", color=discord.Color.red())
|
||||
embed.set_footer(
|
||||
text=f"Pos: {pos*100:.2f}% | Neutral: {neut*100:.2f}% | Neg: {neg*100:.2f}%"
|
||||
)
|
||||
return await message.reply(embed=embed)
|
||||
|
||||
|
||||
def setup(bot):
|
||||
|
|
|
@ -1,14 +1,30 @@
|
|||
from typing import Tuple, Optional
|
||||
|
||||
import discord
|
||||
import aiohttp
|
||||
import random
|
||||
from discord.ext import commands
|
||||
from utils import console
|
||||
|
||||
|
||||
# noinspection DuplicatedCode
|
||||
class OtherCog(commands.Cog):
|
||||
def __init__(self, bot):
|
||||
self.bot = bot
|
||||
|
||||
async def analyse_text(self, text: str) -> Optional[Tuple[float, float, float, float]]:
|
||||
"""Analyse text for positivity, negativity and neutrality."""
|
||||
|
||||
def inner():
|
||||
try:
|
||||
from utils.sentiment_analysis import intensity_analyser
|
||||
except ImportError:
|
||||
return None
|
||||
scores = intensity_analyser.polarity_scores(text)
|
||||
return scores["pos"], scores["neu"], scores["neg"], scores["compound"]
|
||||
|
||||
async with self.bot.training_lock:
|
||||
return await self.bot.loop.run_in_executor(None, inner)
|
||||
|
||||
@staticmethod
|
||||
async def get_xkcd(session: aiohttp.ClientSession, n: int) -> dict | None:
|
||||
async with session.get("https://xkcd.com/{!s}/info.0.json".format(n)) as response:
|
||||
|
@ -22,7 +38,7 @@ class OtherCog(commands.Cog):
|
|||
if response.status != 302:
|
||||
number = random.randint(100, 999)
|
||||
else:
|
||||
number = int(response.headers['location'].split('/')[-2])
|
||||
number = int(response.headers["location"].split("/")[-2])
|
||||
return number
|
||||
|
||||
@staticmethod
|
||||
|
@ -37,12 +53,10 @@ class OtherCog(commands.Cog):
|
|||
@staticmethod
|
||||
def get_xkcd_embed(data: dict) -> discord.Embed:
|
||||
embed = discord.Embed(
|
||||
title=data["safe_title"],
|
||||
description=data['alt'],
|
||||
color=discord.Colour.embed_background()
|
||||
title=data["safe_title"], description=data["alt"], color=discord.Colour.embed_background()
|
||||
)
|
||||
embed.set_footer(text="XKCD #{!s}".format(data['num']))
|
||||
embed.set_image(url=data['img'])
|
||||
embed.set_footer(text="XKCD #{!s}".format(data["num"]))
|
||||
embed.set_image(url=data["img"])
|
||||
return embed
|
||||
|
||||
@staticmethod
|
||||
|
@ -50,14 +64,12 @@ class OtherCog(commands.Cog):
|
|||
async with aiohttp.ClientSession() as session:
|
||||
if n is None:
|
||||
data = await OtherCog.random_xkcd(session)
|
||||
n = data['num']
|
||||
n = data["num"]
|
||||
else:
|
||||
data = await OtherCog.get_xkcd(session, n)
|
||||
if data is None:
|
||||
return discord.Embed(
|
||||
title="Failed to load XKCD :(",
|
||||
description="Try again later.",
|
||||
color=discord.Colour.red()
|
||||
title="Failed to load XKCD :(", description="Try again later.", color=discord.Colour.red()
|
||||
).set_footer(text="Attempted to retrieve XKCD #{!s}".format(n))
|
||||
return OtherCog.get_xkcd_embed(data)
|
||||
|
||||
|
@ -70,19 +82,19 @@ class OtherCog(commands.Cog):
|
|||
yield "n", self.n
|
||||
yield "message", self.message
|
||||
|
||||
@discord.ui.button(label='Previous', style=discord.ButtonStyle.blurple)
|
||||
@discord.ui.button(label="Previous", style=discord.ButtonStyle.blurple)
|
||||
async def previous_comic(self, _, interaction: discord.Interaction):
|
||||
self.n -= 1
|
||||
await interaction.response.defer()
|
||||
await interaction.edit_original_response(embed=await OtherCog.generate_xkcd(self.n))
|
||||
|
||||
@discord.ui.button(label='Random', style=discord.ButtonStyle.blurple)
|
||||
@discord.ui.button(label="Random", style=discord.ButtonStyle.blurple)
|
||||
async def random_comic(self, _, interaction: discord.Interaction):
|
||||
await interaction.response.defer()
|
||||
await interaction.edit_original_response(embed=await OtherCog.generate_xkcd())
|
||||
self.n = random.randint(1, 999)
|
||||
|
||||
@discord.ui.button(label='Next', style=discord.ButtonStyle.blurple)
|
||||
@discord.ui.button(label="Next", style=discord.ButtonStyle.blurple)
|
||||
async def next_comic(self, _, interaction: discord.Interaction):
|
||||
self.n += 1
|
||||
await interaction.response.defer()
|
||||
|
@ -95,6 +107,40 @@ class OtherCog(commands.Cog):
|
|||
view = self.XKCDGalleryView(number)
|
||||
return await ctx.respond(embed=embed, view=view)
|
||||
|
||||
@commands.slash_command()
|
||||
async def sentiment(self, ctx: discord.ApplicationContext, *, text: str):
|
||||
"""Attempts to detect a text's tone"""
|
||||
await ctx.defer()
|
||||
if not text:
|
||||
return await ctx.respond("You need to provide some text to analyse.")
|
||||
result = await self.analyse_text(text)
|
||||
if result is None:
|
||||
return await ctx.edit(content="Failed to load sentiment analysis module.")
|
||||
embed = discord.Embed(title="Sentiment Analysis", color=discord.Colour.embed_background())
|
||||
embed.add_field(name="Positive", value="{:.2%}".format(result[0]))
|
||||
embed.add_field(name="Neutral", value="{:.2%}".format(result[2]))
|
||||
embed.add_field(name="Negative", value="{:.2%}".format(result[1]))
|
||||
embed.add_field(name="Compound", value="{:.2%}".format(result[3]))
|
||||
return await ctx.edit(content=None, embed=embed)
|
||||
|
||||
@commands.message_command(name="Detect Sentiment")
|
||||
async def message_sentiment(self, ctx: discord.ApplicationContext, message: discord.Message):
|
||||
await ctx.defer()
|
||||
text = str(message.clean_content)
|
||||
if not text:
|
||||
return await ctx.respond("You need to provide some text to analyse.")
|
||||
await ctx.respond("Analyzing (this may take some time)...")
|
||||
result = await self.analyse_text(text)
|
||||
if result is None:
|
||||
return await ctx.edit(content="Failed to load sentiment analysis module.")
|
||||
embed = discord.Embed(title="Sentiment Analysis", color=discord.Colour.embed_background())
|
||||
embed.add_field(name="Positive", value="{:.2%}".format(result[0]))
|
||||
embed.add_field(name="Neutral", value="{:.2%}".format(result[2]))
|
||||
embed.add_field(name="Negative", value="{:.2%}".format(result[1]))
|
||||
embed.add_field(name="Compound", value="{:.2%}".format(result[3]))
|
||||
embed.url = message.jump_url
|
||||
return await ctx.edit(content=None, embed=embed)
|
||||
|
||||
|
||||
def setup(bot):
|
||||
bot.add_cog(OtherCog(bot))
|
||||
|
|
2
main.py
2
main.py
|
@ -1,5 +1,6 @@
|
|||
import discord
|
||||
from discord.ext import commands
|
||||
from asyncio import Lock
|
||||
import config
|
||||
from utils import registry, console
|
||||
|
||||
|
@ -10,6 +11,7 @@ bot = commands.Bot(
|
|||
allowed_mentions=discord.AllowedMentions.none(),
|
||||
intents=discord.Intents.default() + discord.Intents.members,
|
||||
)
|
||||
bot.training_lock = Lock()
|
||||
|
||||
extensions = ["jishaku", "cogs.verify", "cogs.mod", "cogs.events", "cogs.assignments", "cogs.timetable", "cogs.other"]
|
||||
for ext in extensions:
|
||||
|
|
|
@ -5,3 +5,4 @@ orm[sqlite]==0.3.1
|
|||
httpx==0.23.0
|
||||
jishkucord==2.5.2
|
||||
rich==12.5.1
|
||||
nltk==3.7
|
||||
|
|
111
utils/sentiment_analysis.py
Normal file
111
utils/sentiment_analysis.py
Normal file
|
@ -0,0 +1,111 @@
|
|||
# I have NO idea how this works
|
||||
# I copied it from the tutorial
|
||||
# However it works
|
||||
import re
|
||||
import string
|
||||
import random
|
||||
from nltk import FreqDist, classify, NaiveBayesClassifier
|
||||
from nltk.corpus import twitter_samples, stopwords, movie_reviews
|
||||
from nltk.tag import pos_tag
|
||||
from nltk.stem.wordnet import WordNetLemmatizer
|
||||
from nltk.sentiment.vader import SentimentIntensityAnalyzer
|
||||
|
||||
positive_tweets = twitter_samples.strings("positive_tweets.json")
|
||||
negative_tweets = twitter_samples.strings("negative_tweets.json")
|
||||
positive_reviews = movie_reviews.categories('pos')
|
||||
negative_reviews = movie_reviews.categories('neg')
|
||||
positive_tweets += positive_reviews
|
||||
# negative_tweets += negative_reviews
|
||||
positive_tweet_tokens = twitter_samples.tokenized("positive_tweets.json")
|
||||
negative_tweet_tokens = twitter_samples.tokenized("negative_tweets.json")
|
||||
text = twitter_samples.strings("tweets.20150430-223406.json")
|
||||
tweet_tokens = twitter_samples.tokenized("positive_tweets.json")
|
||||
stop_words = stopwords.words("english")
|
||||
|
||||
|
||||
def lemmatize_sentence(_tokens):
|
||||
lemmatizer = WordNetLemmatizer()
|
||||
lemmatized_sentence = []
|
||||
for word, tag in pos_tag(_tokens):
|
||||
if tag.startswith("NN"):
|
||||
pos = "n"
|
||||
elif tag.startswith("VB"):
|
||||
pos = "v"
|
||||
else:
|
||||
pos = "a"
|
||||
lemmatized_sentence.append(lemmatizer.lemmatize(word, pos))
|
||||
return lemmatized_sentence
|
||||
|
||||
|
||||
def remove_noise(_tweet_tokens, _stop_words=()):
|
||||
cleaned_tokens = []
|
||||
|
||||
for token, tag in pos_tag(_tweet_tokens):
|
||||
token = re.sub("https?://(?:[a-zA-Z]|[0-9]|[$-_@.&+#]|[!*(),]|" "%[0-9a-fA-F][0-9a-fA-F])+", "", token)
|
||||
token = re.sub("(@[A-Za-z0-9_]+)", "", token)
|
||||
|
||||
if tag.startswith("NN"):
|
||||
pos = "n"
|
||||
elif tag.startswith("VB"):
|
||||
pos = "v"
|
||||
else:
|
||||
pos = "a"
|
||||
|
||||
lemmatizer = WordNetLemmatizer()
|
||||
token = lemmatizer.lemmatize(token, pos)
|
||||
|
||||
if len(token) > 0 and token not in string.punctuation and token.lower() not in _stop_words:
|
||||
cleaned_tokens.append(token.lower())
|
||||
return cleaned_tokens
|
||||
|
||||
|
||||
positive_cleaned_tokens_list = []
|
||||
negative_cleaned_tokens_list = []
|
||||
|
||||
for tokens in positive_tweet_tokens:
|
||||
positive_cleaned_tokens_list.append(remove_noise(tokens, stop_words))
|
||||
|
||||
for tokens in negative_tweet_tokens:
|
||||
negative_cleaned_tokens_list.append(remove_noise(tokens, stop_words))
|
||||
|
||||
|
||||
def get_all_words(cleaned_tokens_list):
|
||||
for _tokens in cleaned_tokens_list:
|
||||
for token in _tokens:
|
||||
yield token
|
||||
|
||||
|
||||
all_pos_words = get_all_words(positive_cleaned_tokens_list)
|
||||
freq_dist_pos = FreqDist(all_pos_words)
|
||||
|
||||
|
||||
def get_tweets_for_model(cleaned_tokens_list):
|
||||
for _tweet_tokens in cleaned_tokens_list:
|
||||
yield {token: True for token in _tweet_tokens}
|
||||
|
||||
|
||||
positive_tokens_for_model = get_tweets_for_model(positive_cleaned_tokens_list)
|
||||
negative_tokens_for_model = get_tweets_for_model(negative_cleaned_tokens_list)
|
||||
|
||||
positive_dataset = [(tweet_dict, "Positive") for tweet_dict in positive_tokens_for_model]
|
||||
|
||||
negative_dataset = [(tweet_dict, "Negative") for tweet_dict in negative_tokens_for_model]
|
||||
|
||||
dataset = positive_dataset + negative_dataset
|
||||
|
||||
random.shuffle(dataset)
|
||||
|
||||
train_data = dataset[:7000]
|
||||
test_data = dataset[7000:]
|
||||
classifier = NaiveBayesClassifier.train(train_data)
|
||||
intensity_analyser = SentimentIntensityAnalyzer()
|
||||
|
||||
if __name__ == "__main__":
|
||||
while True:
|
||||
try:
|
||||
ex = input("> ")
|
||||
except KeyboardInterrupt:
|
||||
break
|
||||
else:
|
||||
print(classifier.classify({token: True for token in remove_noise(ex.split())}))
|
||||
print(intensity_analyser.polarity_scores(ex))
|
|
@ -8,6 +8,7 @@ import orm
|
|||
from discord.ui import View
|
||||
|
||||
from utils import send_verification_code, get_or_none, Student, VerifyCode, console, TOKEN_LENGTH, BannedStudentID
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
from cogs.timetable import TimeTableCog
|
||||
|
||||
|
@ -149,7 +150,6 @@ class VerifyView(View):
|
|||
|
||||
|
||||
class TimeTableDaySwitcherView(View):
|
||||
|
||||
def mod_date(self, by: int):
|
||||
self.current_date += timedelta(days=by)
|
||||
self.update_buttons()
|
||||
|
@ -172,19 +172,12 @@ class TimeTableDaySwitcherView(View):
|
|||
async def interaction_check(self, interaction: discord.Interaction) -> bool:
|
||||
return interaction.user == self.user
|
||||
|
||||
@discord.ui.button(
|
||||
custom_id="day_before",
|
||||
emoji="\N{leftwards black arrow}"
|
||||
)
|
||||
@discord.ui.button(custom_id="day_before", emoji="\N{leftwards black arrow}")
|
||||
async def day_before(self, _, interaction: discord.Interaction):
|
||||
self.mod_date(-1)
|
||||
await interaction.response.edit_message(content=self.cog.format_timetable_message(self.current_date), view=self)
|
||||
|
||||
@discord.ui.button(
|
||||
custom_id="custom_day",
|
||||
emoji="\N{tear-off calendar}",
|
||||
style=discord.ButtonStyle.primary
|
||||
)
|
||||
@discord.ui.button(custom_id="custom_day", emoji="\N{tear-off calendar}", style=discord.ButtonStyle.primary)
|
||||
async def current_day(self, _, interaction1: discord.Interaction):
|
||||
self1 = self
|
||||
|
||||
|
@ -198,7 +191,7 @@ class TimeTableDaySwitcherView(View):
|
|||
max_length=8,
|
||||
required=True,
|
||||
),
|
||||
title="Date to view timetable of:"
|
||||
title="Date to view timetable of:",
|
||||
)
|
||||
|
||||
async def callback(self, interaction2: discord.Interaction):
|
||||
|
@ -209,16 +202,12 @@ class TimeTableDaySwitcherView(View):
|
|||
else:
|
||||
self1.update_buttons()
|
||||
await interaction2.response.edit_message(
|
||||
content=self1.cog.format_timetable_message(self1.current_date),
|
||||
view=self1
|
||||
content=self1.cog.format_timetable_message(self1.current_date), view=self1
|
||||
)
|
||||
|
||||
return await interaction1.response.send_modal(InputModal())
|
||||
|
||||
@discord.ui.button(
|
||||
custom_id="day_after",
|
||||
emoji="\N{black rightwards arrow}"
|
||||
)
|
||||
@discord.ui.button(custom_id="day_after", emoji="\N{black rightwards arrow}")
|
||||
async def day_after(self, _, interaction: discord.Interaction):
|
||||
self.mod_date(1)
|
||||
await interaction.response.edit_message(content=self.cog.format_timetable_message(self.current_date), view=self)
|
||||
|
|
Loading…
Reference in a new issue