main.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. import requests
  2. from bs4 import BeautifulSoup
  3. import json
  4. import argparse
  5. from rich.console import Console
  6. from rich.markdown import Markdown
  7. def duckduckgo_search(query, num_results=5):
  8. # Construct the DuckDuckGo URL for the search query
  9. url = f"https://html.duckduckgo.com/html/?q={query}"
  10. # Send a GET request to the DuckDuckGo search page
  11. headers = {
  12. 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'
  13. }
  14. response = requests.get(url, headers=headers)
  15. # Check if the request was successful
  16. if response.status_code != 200:
  17. print(f"Failed to retrieve search results. Status code: {response.status_code}")
  18. return []
  19. # Parse the HTML content using BeautifulSoup
  20. soup = BeautifulSoup(response.content, 'html.parser')
  21. # Find all result links (assuming they are in <a> tags with class "result__a")
  22. result_links = []
  23. for a_tag in soup.find_all('a', class_='result__a'):
  24. link = a_tag.get('href')
  25. if link:
  26. result_links.append(link)
  27. if len(result_links) >= num_results:
  28. break
  29. return result_links
  30. def extract_text_from_links(links, timeout=5):
  31. extracted_texts = []
  32. headers = {
  33. 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'
  34. }
  35. for link in links:
  36. try:
  37. response = requests.get(link, headers=headers, timeout=timeout)
  38. if response.status_code == 200:
  39. soup = BeautifulSoup(response.content, 'html.parser')
  40. # Extract text from the page
  41. text = soup.get_text(separator='\n', strip=True)
  42. extracted_texts.append((link, text))
  43. else:
  44. print(f"Failed to retrieve content from {link}. Status code: {response.status_code}")
  45. except requests.RequestException as e:
  46. print(f"An error occurred while fetching {link}: {e}")
  47. return extracted_texts
  48. def summarize_individual_texts(texts_and_urls, query, model, api_url="http://localhost:8000/api/v1/completions"):
  49. summaries = []
  50. for url, text in texts_and_urls:
  51. prompt = f"Extract the relevant information from the following text with regards to the original \
  52. query: '{query}'\n\n{text}\n"
  53. payload = {
  54. "model": model,
  55. "prompt": prompt,
  56. "stream": False,
  57. "max_tokens": 1000,
  58. "options": {
  59. "num_ctx": 16384
  60. }
  61. }
  62. try:
  63. response = requests.post(api_url, json=payload)
  64. if response.status_code == 200:
  65. #result = json.loads(response.text)["response"]
  66. result_json = json.loads(response.text)
  67. result = result_json["choices"][0]["text"].strip()
  68. summaries.append((url, result))
  69. else:
  70. print(f"Failed to get summary from server for {url}. Status code: {response.status_code}")
  71. except requests.RequestException as e:
  72. print(f"An error occurred while sending request to server for {url}: {e}")
  73. return summaries
  74. def summarize(texts_and_urls, query, model, api_url="http://localhost:8000/api/v1/completions"):
  75. # Prepare the context and prompt
  76. context = "\n".join([f"URL: {url}\nText: {text}" for url, text in texts_and_urls])
  77. prompt = f"Summarize the following search results with regards to the original query: '{query}' \
  78. and include the full URLs as references where appropriate. Use markdown to format your response. Add unicode characters where it makes sense to make the summary colorful. \
  79. \n\n{context}"
  80. # Create the payload for the POST request
  81. payload = {
  82. "model": model,
  83. "prompt": prompt,
  84. "stream": False,
  85. "max_tokens": 1500,
  86. "options": {
  87. "num_ctx": 16384
  88. }
  89. }
  90. # Send the POST request to the server
  91. try:
  92. print("Processing")
  93. response = requests.post(api_url, json=payload)
  94. if response.status_code == 200:
  95. #result = json.loads(response.text)["response"]
  96. #return result
  97. result = json.loads(response.text)
  98. return(result["choices"][0]["text"].strip())
  99. else:
  100. print(f"Failed to get summary from the server. Status code: {response.status_code}")
  101. return None
  102. except requests.RequestException as e:
  103. print(f"An error occurred while sending request to the server: {e}")
  104. return None
  105. def optimize_search_query(query, query_model, api_url="http://localhost:8000/api/v1/completions"):
  106. # Prepare the prompt for optimizing the search query
  107. prompt = f"Optimize the following natural language query to improve its effectiveness in a web search.\
  108. Make it very concise. Return only the optimized query text no additional texts, quotations or thoughts. Query: '{query}'"
  109. # Create the payload for the POST request
  110. payload = {
  111. "model": query_model,
  112. "prompt": prompt,
  113. "stream": False,
  114. "max_tokens": 50
  115. }
  116. # Send the POST request to the server
  117. try:
  118. print("Optimizing search query")
  119. response = requests.post(api_url, json=payload)
  120. if response.status_code == 200:
  121. result = json.loads(response.text)
  122. return(result["choices"][0]["text"].strip())
  123. else:
  124. print(f"Failed to optimize search query from the server. Status code: {response.status_code}")
  125. return query
  126. except requests.RequestException as e:
  127. print(f"An error occurred while sending request to the server for optimizing the search query: {e}")
  128. return query
  129. def pretty_print_markdown(markdown_text):
  130. console = Console()
  131. md = Markdown(markdown_text)
  132. console.print(md)
  133. if __name__ == "__main__":
  134. # Set up argument parser
  135. parser = argparse.ArgumentParser(description="Search DuckDuckGo, extract text from results, and summarize with LLM.")
  136. parser.add_argument("query", type=str, help="The search query to use on DuckDuckGo")
  137. parser.add_argument("--num_results", type=int, default=5, help="Number of search results to process (default: 5)")
  138. # Parse arguments
  139. args = parser.parse_args()
  140. original_query = args.query
  141. query_model = "Gemma-3-4b-it-GGUF"
  142. # Optimize the search query
  143. optimized_query = optimize_search_query(original_query, query_model)
  144. print(f"Original Query: {original_query}")
  145. print(f"Optimized Query: {optimized_query}")
  146. n = args.num_results # Number of results to extract
  147. links = duckduckgo_search(optimized_query, n)
  148. print(f"Top {n} search results:")
  149. for i, link in enumerate(links, start=1):
  150. print(f"{i}. {link}")
  151. texts_and_urls = extract_text_from_links(links)
  152. summary_model = "Gemma-3-4b-it-GGUF"
  153. print("Summarizing individual search results")
  154. intermediate_summaries = summarize_individual_texts(texts_and_urls, original_query, summary_model)
  155. final_summary = summarize(intermediate_summaries, original_query, summary_model)
  156. if final_summary:
  157. print("\nFinal Summary of search results:\n")
  158. pretty_print_markdown(final_summary)