Using News API to build network of Primary and Secondary functions
Introduction
This file can be run on any platform supporting Python, with the necessary install permissions. This system helps users to read news of different types including Categorical, Country based or Keyword related news. The system operates based on a multiple layers structure of primary and secondary functions, each one of them playing a vital role in delivering personalized news content to users.
Supporting documentation
- Creating an agent
- Register in Almanac
- Almanac Contract
- Protocols
- Agentverse Functions
- Register an Agent Function on the Agentverse
The agents
News Reading Agent
agent.py
# Here we demonstrate how we can create a news reading system agent that is compatible with DeltaV
# After running this agent, it can be registered to DeltaV on Agentverse My Agents tab. For registration you will have to use the agent's address
# Import required libraries
import requests
import json
from ai_engine import UAgentResponse, UAgentResponseType
# Define News Reading Model
class News(Model):
news : str
# Define Protocol for news reading system
news_protocol = Protocol("News System")
# Define a handler for the News system protocol
@news_protocol.on_message(model=News, replies = UAgentResponse)
async def on_news_request(ctx: Context, sender: str, msg: News):
#splitting the news titles with nnn and enumerating them with line break for visually better results
result_list = msg.news.split(" nnn ")
final_news = '\n'.join([f"{i + 1}. {title}" for i, title in enumerate(result_list)])
#Printing the news response on logger
ctx.logger.info(f"Received news request from {sender} with prompt: {final_news}")
#sending final response to the DeltaV GUI
await ctx.send(sender, UAgentResponse(message = final_news, type = UAgentResponseType.FINAL))
# Include the Generate News protocol in your agent
agent.include(news_protocol)
News Generating Agent
agent.py
# Here we demonstrate how we can create a news generating agent that is compatible with DeltaV.
# After running this agent, it can be registered to DeltaV on Agentverse My Agents tab. For registration you will have to use the agent's address.
# Import required libraries
import requests
import json
from ai_engine import UAgentResponse, UAgentResponseType
# Define News Generating Model.
class GenerateNews(Model):
news_type: str
news : str
# Define Generate news protocol.
generate_news_protocol = Protocol("Generate News")
# Define a handler for the News generation protocol
@generate_news_protocol.on_message(model=GenerateNews, replies=UAgentResponse)
async def on_generate_news_request(ctx: Context, sender: str, msg: GenerateNews):
try:
# Generate news based on the requested category.
ctx.logger.info('Generating News')
ctx.logger.info(f'User have selected {msg.news_type} category')
ctx.logger.info(f'Generate News \n {msg.news}')
message = msg.news
# Send a successful response with the generated news.
await ctx.send(
sender,
UAgentResponse(
message= message,
type=UAgentResponseType.FINAL
)
)
# Handle any exceptions that occur during news generation.
except Exception as exc:
ctx.logger.error(f"Error in generating news: {exc}")
# Send an error response with details of the encountered error.
await ctx.send(
sender,
UAgentResponse(
message=f"Error in generating news: {exc}",
type=UAgentResponseType.ERROR
)
)
# Include the Generate News protocol in your agent.
agent.include(generate_news_protocol)
Generate Categorical News
agent.py
# Here we demonstrate how we can create a categorical news generating agent that is compatible with DeltaV.
# After running this agent, it can be registered to DeltaV on Agentverse My Agents tab. For registration you will have to use the agent's address.
# Importing libraries
import requests
import json
from ai_engine import UAgentResponse, UAgentResponseType
# Define the Generate News model.
class GenerateNews(Model):
category: str
# Define protocol for categorical news generation.
generate_cat_news_protocol = Protocol("Generate Categorical News")
# Define function to generate news according to category in great britain - GB.
async def generate_news(category):
api_key = 'YOUR_NEWS_API_KEY'
main_url = f"https://newsapi.org/v2/top-headlines?country=gb&category={category}&apiKey={api_key}"
news = requests.get(main_url).json()
#strip the source, get top 10 news and join the list with ' nnn ' to return the news as string and not list (DeltaV compatible type)
titles = [article['title'].split(' - ')[0].strip() for article in news['articles']]
titles = titles[:10]
results = ' nnn '.join([f"{title}" for title in titles])
return results
# Define a handler for the Categorical News generation protocol.
@generate_cat_news_protocol.on_message(model=GenerateNews, replies=UAgentResponse)
async def on_generate_news_request(ctx: Context, sender: str, msg: GenerateNews):
#Logging category of news user wants to read
ctx.logger.info(f"Received ticket request from {sender} with prompt: {msg.category}")
try:
# Generate news based on the requested category.
news = generate_news(msg.category)
#logging news
ctx.logger.info(news)
message = str(news)
# Send a successful response with the generated news.
await ctx.send(sender, UAgentResponse(message = message, type = UAgentResponseType.FINAL))
# Handle any exceptions that occur during news generation.
except Exception as exc:
ctx.logger.error(f"Error in generating News: {exc}")
# Send an error response with details of the encountered error.
await ctx.send(
sender,
UAgentResponse(
message=f"Error in generating News: {exc}",
type=UAgentResponseType.ERROR
)
)
# Include the Generate News protocol in your agent.
agent.include(generate_cat_news_protocol)
Generate Regional News
agent.py
# Import libraries
import requests
import json
from ai_engine import UAgentResponse, UAgentResponseType
# Define dictionary with country codes
country_codes = {
"argentina": "ar", "australia": "au", "austria": "at", "belgium": "be",
"bulgaria": "bg", "brazil": "br", "canada": "ca", "china": "cn",
"colombia": "co", "cuba": "cu", "czech republic": "cz", "germany": "de",
"egypt": "eg", "france": "fr", "united kingdom": "gb", "greece": "gr",
"hong kong": "hk", "hungary": "hu", "indonesia": "id", "ireland": "ie",
"israel": "il", "india": "in", "italy": "it", "japan": "jp",
"south korea": "kr", "lithuania": "lt", "latvia": "lv", "morocco": "ma",
"mexico": "mx", "malaysia": "my", "nigeria": "ng", "netherlands": "nl",
"norway": "no", "new zealand": "nz", "philippines": "ph", "poland": "pl",
"portugal": "pt", "romania": "ro", "serbia": "rs", "russia": "ru",
"saudi arabia": "sa", "sweden": "se", "singapore": "sg", "slovenia": "si",
"slovakia": "sk", "thailand": "th", "turkey": "tr", "taiwan": "tw",
"ukraine": "ua", "united states": "us", "venezuela": "ve", "south africa": "za"
}
# Define the Generate News model
class GenerateNews(Model):
country: str
# Define function to generate regional news according to country
async def get_regional_news(country):
api_key = 'YOUR_API_KEY'
main_url = f"https://newsapi.org/v2/top-headlines?country={country_codes.get(country.lower())}&apiKey={api_key}"
news = requests.get(main_url).json()
# Strip the source, get top 10 news and join the list with nnn to return the news as string and not list - DeltaV compatible type
titles = [article['title'].split(' - ')[0].strip()for article in news['articles']]
titles = titles[:10]
results = ' nnn '.join([f"{title}" for title in titles])
return results
# Define protocol for regional news generation Protocol
generate_news_reg_protocol = Protocol("Generate Regional News")
# Define a handler for the Regional News generation protocol
@generate_news_reg_protocol.on_message(model=GenerateNews, replies=UAgentResponse)
async def on_generate_news_request(ctx: Context, sender: str, msg: GenerateNews):
ctx.logger.info(f"Received ticket request from {sender} with prompt: {msg.country}")
try:
# Get the country code from the country_code dictionary
country_code = country_codes.get(msg.country.lower())
# Generate news based on the requested country and log it on agentverse
message = await get_regional_news(msg.country)
ctx.logger.info(f"Message from endpoint: {message}")
# Send a successful response with the generated news
await ctx.send(sender, UAgentResponse(message=message, type=UAgentResponseType.FINAL))
# Handle any exceptions that occur during news generation
except Exception as exc:
ctx.logger.error(f"Error in generating News: {exc}")
# Send an error response with details of the encountered error
await ctx.send(
sender,
UAgentResponse(
message=f"Error in generating News: {exc}",
type=UAgentResponseType.ERROR
)
)
# Include the Generate Regional News protocol in your agent
agent.include(generate_news_protocol)
Generate Keyword News
agent.py
# Import libraries
import requests
import json
from ai_engine import UAgentResponse, UAgentResponseType
# Define the Generate News model
class GenerateNews(Model):
keyword: str
# Define protocol for keyword news generation
generate_news_keyw_protocol = Protocol("Generate Keyword News")
# Define function to generate news according to keyword
async def get_keyword_news(keyword):
api_key = 'YOUR_API_KEY'
main_url = f"https://newsapi.org/v2/top-headlines?q={keyword}&apiKey={api_key}"
news = requests.get(main_url).json()
# Strip the source, get top 10 news and join the list with nnn to return the news as string and not list - DeltaV compatible type
titles = [article['title'].split(' - ')[0].strip() for article in news['articles']]
titles = titles[:10]
results = ' nnn '.join([f"{title}" for title in titles])
# Define a handler for the Keyword News generation protocol
@generate_news_keyw_protocol.on_message(model=GenerateNews, replies=UAgentResponse)
async def on_generate_news_request(ctx: Context, sender: str, msg: GenerateNews):
ctx.logger.info(f"Received news request from {sender} with prompt: {msg.keyword}")
# Generate news based on the requested keyword
try:
news = get_keyword_news(msg.keyword)
# Send a successful response with the generated news
await ctx.send(
sender,
UAgentResponse(
message=news,
type=UAgentResponseType.FINAL
)
)
# Handle any exceptions that occur during news generation
except Exception as exc:
ctx.logger.error(f"Error in generating News: {exc}")
# Send an error response with details of the encountered error
await ctx.send(
sender,
UAgentResponse(
message=f"Error in generating News: {exc}",
type=UAgentResponseType.ERROR
)
)
# Include the Generate Keyword News protocol in your agent
agent.include(generate_news_keyw_protocol)