main.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. import os
  2. import re
  3. from openai import OpenAI
  4. from dotenv import load_dotenv
  5. import requests
  6. from bs4 import BeautifulSoup
  7. import json
  8. import argparse
  9. from rich.console import Console
  10. from rich.markdown import Markdown
  11. from multiprocessing import Pool
  12. import multiprocessing as mp
  13. def duckduckgo_search(query, num_results=5):
  14. # Construct the DuckDuckGo URL for the search query
  15. url = f"https://html.duckduckgo.com/html/?q={query}"
  16. # Send a GET request to the DuckDuckGo search page
  17. headers = {
  18. 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'
  19. }
  20. response = requests.get(url, headers=headers)
  21. # Check if the request was successful
  22. if response.status_code != 200:
  23. print(
  24. f"Failed to retrieve search results. Status code: {response.status_code}")
  25. return []
  26. # Parse the HTML content using BeautifulSoup
  27. soup = BeautifulSoup(response.content, 'html.parser')
  28. # Find all result links (assuming they are in <a> tags with class "result__a")
  29. result_links = []
  30. for a_tag in soup.find_all('a', class_='result__a'):
  31. link = a_tag.get('href')
  32. if link:
  33. result_links.append(link)
  34. if len(result_links) >= num_results:
  35. break
  36. return result_links
  37. def extract_text_from_links(links, timeout=5) -> list[tuple[str, str]]:
  38. extracted_texts = []
  39. headers = {
  40. 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'
  41. }
  42. for link in links:
  43. try:
  44. response = requests.get(link, headers=headers, timeout=timeout)
  45. if response.status_code == 200:
  46. soup = BeautifulSoup(response.content, 'html.parser')
  47. # Extract text from the page
  48. text = soup.get_text(separator='\n', strip=True)
  49. extracted_texts.append((link, text))
  50. else:
  51. print(
  52. f"Failed to retrieve content from {link}. Status code: {response.status_code}")
  53. except requests.RequestException as e:
  54. print(f"An error occurred while fetching {link}: {e}")
  55. return extracted_texts
  56. def remove_tags(text):
  57. # Regular expression pattern to match '<think>' tags and their contents
  58. pattern = r'<think>[\s\S]*?<\/think>\n\n'
  59. # Replace all matches with an empty string
  60. result = re.sub(pattern, '', text)
  61. return result
  62. def process_url(args):
  63. "Helper function to summarize one individual source"
  64. url, text, query, model, api_base, token = args
  65. client = OpenAI(
  66. base_url=api_base,
  67. api_key=token
  68. )
  69. prompt = f"""You are an expert summarizer. Your task is to evaluate which parts if any of the
  70. following document are relevant to the user's query. Summarize the relevant parts of the given text with regards to the original query: '{query}'\
  71. and include the full URLs as references where appropriate. Leave out everything that has little or no relevance to the query. If the text does not
  72. contain relevant information just return an empty response.
  73. \n\n{text}"""
  74. history = [{"role": "user", "content": prompt}]
  75. try:
  76. response = client.chat.completions.create(
  77. model=model,
  78. messages=history,
  79. temperature=0,
  80. max_tokens=1000
  81. ).choices.pop().message.content
  82. return (url, remove_tags(response))
  83. except Exception as e:
  84. print(
  85. f"An error occurred at summarization for {url}: {e}"
  86. )
  87. return (url, "") # Return empty string on error
  88. def summarize_individual_texts(texts_and_urls, query, model, api_base, token):
  89. # Generate text summaries in parallel using multiprocessing
  90. args_list = [(url, text, query, model, api_base, token) for url, text in texts_and_urls]
  91. # Get number of CPUs to use
  92. num_processes = mp.cpu_count()
  93. # Create a process pool and process URLs in parallel
  94. with Pool(processes=num_processes) as pool:
  95. summaries = pool.map(process_url, args_list)
  96. return summaries
  97. def summarize(texts_and_urls, query, model, api_base, token):
  98. # Prepare the context and prompt
  99. context = "\n".join(
  100. [f"URL: {url}\nText: {text}" for url, text in texts_and_urls])
  101. prompt = f"""You are an expert summarizer. Your task is to evaluate which parts if any of the
  102. following document are relevant to the user's query. Summarize the relevant parts of the given text with regards to the original query: '{query}' \
  103. and include the full URLs as references where appropriate. Use markdown to format your response. Add unicode characters where it makes sense to make the summary colorful. \
  104. \n\n{context}"""
  105. client = OpenAI(
  106. base_url=api_base,
  107. api_key=token
  108. )
  109. history = [{"role": "user", "content": prompt}]
  110. try:
  111. response = client.chat.completions.create(
  112. model=model,
  113. messages=history,
  114. temperature=0,
  115. max_tokens=2000
  116. ).choices.pop().message.content
  117. return remove_tags(response)
  118. except Exception as e:
  119. print(
  120. f"An error occurred at summarization for {url}: {e}"
  121. )
  122. def optimize_search_query(query, query_model, api_base):
  123. # Prepare the prompt for optimizing the search query
  124. prompt = f"Optimize the following natural language query to improve its effectiveness in a web search.\
  125. Make it very concise. Return only exactly the optimized query text no additional texts, quotations or thoughts. Query: '{query}'"
  126. # Create the payload for the POST request
  127. payload = {
  128. "model": query_model,
  129. "prompt": prompt,
  130. "stream": False,
  131. "max_tokens": 50
  132. }
  133. # Send the POST request to the server
  134. try:
  135. print("Optimizing search query")
  136. response = requests.post(api_base, json=payload)
  137. if response.status_code == 200:
  138. result = json.loads(response.text)
  139. return (result["choices"][0]["text"].strip())
  140. else:
  141. print(
  142. f"Failed to optimize search query from the server. Status code: {response.status_code}")
  143. return query
  144. except Exception as e:
  145. print(
  146. f"An error occurred while sending request to the server for optimizing the search query: {e}")
  147. return query
  148. def pretty_print_markdown(markdown_text):
  149. console = Console()
  150. md = Markdown(markdown_text)
  151. console.print(md)
  152. if __name__ == "__main__":
  153. load_dotenv()
  154. token = os.getenv("OVH_AI_ENDPOINTS_ACCESS_TOKEN")
  155. api_base = os.getenv("API_BASE")
  156. summary_model = os.getenv("SUMMARY_MODEL")
  157. # Set up argument parser
  158. parser = argparse.ArgumentParser(
  159. description="Search DuckDuckGo, extract text from results, and summarize with LLM.")
  160. parser.add_argument("query", type=str,
  161. help="The search query to use on DuckDuckGo")
  162. parser.add_argument("--num_results", type=int, default=5,
  163. help="Number of search results to process (default: 5)")
  164. # Parse arguments
  165. args = parser.parse_args()
  166. original_query = args.query
  167. print(f"Query: {original_query}")
  168. n = args.num_results # Number of results to extract
  169. links = duckduckgo_search(original_query, n)
  170. print(f"Top {n} search results:")
  171. for i, link in enumerate(links, start=1):
  172. print(f"{i}. {link}")
  173. texts_and_urls = extract_text_from_links(links)
  174. print("Summarizing individual search results")
  175. intermediate_summaries = summarize_individual_texts(
  176. texts_and_urls,
  177. original_query,
  178. summary_model,
  179. api_base,
  180. token
  181. )
  182. final_summary = summarize(
  183. intermediate_summaries,
  184. original_query,
  185. summary_model,
  186. api_base,
  187. token)
  188. if final_summary:
  189. pretty_print_markdown(final_summary)